mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 04:50:38 +00:00
Compare commits
17 Commits
skyzh/page
...
devin/1746
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ec905239b | ||
|
|
e32aceff16 | ||
|
|
486d9f0c4d | ||
|
|
000503b38a | ||
|
|
7a576d723c | ||
|
|
b419fa9e2f | ||
|
|
aa45bf998d | ||
|
|
f8dffb62cf | ||
|
|
ee7479cb68 | ||
|
|
62ac5b94b3 | ||
|
|
f0e7b3e0ef | ||
|
|
c6ff18affc | ||
|
|
16ca74a3f4 | ||
|
|
cb67f9a651 | ||
|
|
baf425a2cd | ||
|
|
0b243242df | ||
|
|
6131d86ec9 |
71
.github/workflows/benchmarking.yml
vendored
71
.github/workflows/benchmarking.yml
vendored
@@ -53,6 +53,77 @@ concurrency:
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
cleanup:
|
||||
runs-on: [ self-hosted, us-east-2, x64 ]
|
||||
container:
|
||||
image: ghcr.io/neondatabase/build-tools:pinned-bookworm
|
||||
credentials:
|
||||
username: ${{ github.actor }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
options: --init
|
||||
env:
|
||||
ORG_ID: org-solitary-dew-09443886
|
||||
LIMIT: 100
|
||||
SEARCH: "Created by actions/neon-project-create; GITHUB_RUN_ID"
|
||||
BASE_URL: https://console-stage.neon.build/api/v2
|
||||
DRY_RUN: "false" # Set to "true" to just test out the workflow
|
||||
|
||||
steps:
|
||||
- name: Harden the runner (Audit all outbound calls)
|
||||
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
|
||||
with:
|
||||
egress-policy: audit
|
||||
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
|
||||
- name: Cleanup inactive Neon projects left over from prior runs
|
||||
env:
|
||||
API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
|
||||
NOW=$(date -u +%s)
|
||||
DAYS_AGO=$((NOW - 5 * 86400))
|
||||
|
||||
REQUEST_URL="$BASE_URL/projects?limit=$LIMIT&search=$(printf '%s' "$SEARCH" | jq -sRr @uri)&org_id=$ORG_ID"
|
||||
|
||||
echo "Requesting project list from:"
|
||||
echo "$REQUEST_URL"
|
||||
|
||||
response=$(curl -s -X GET "$REQUEST_URL" \
|
||||
--header "Accept: application/json" \
|
||||
--header "Content-Type: application/json" \
|
||||
--header "Authorization: Bearer ${API_KEY}" )
|
||||
|
||||
echo "Response:"
|
||||
echo "$response" | jq .
|
||||
|
||||
projects_to_delete=$(echo "$response" | jq --argjson cutoff "$DAYS_AGO" '
|
||||
.projects[]
|
||||
| select(.compute_last_active_at != null)
|
||||
| select((.compute_last_active_at | fromdateiso8601) < $cutoff)
|
||||
| {id, name, compute_last_active_at}
|
||||
')
|
||||
|
||||
if [ -z "$projects_to_delete" ]; then
|
||||
echo "No projects eligible for deletion."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
echo "Projects that will be deleted:"
|
||||
echo "$projects_to_delete" | jq -r '.id'
|
||||
|
||||
if [ "$DRY_RUN" = "false" ]; then
|
||||
echo "$projects_to_delete" | jq -r '.id' | while read -r project_id; do
|
||||
echo "Deleting project: $project_id"
|
||||
curl -s -X DELETE "$BASE_URL/projects/$project_id" \
|
||||
--header "Accept: application/json" \
|
||||
--header "Content-Type: application/json" \
|
||||
--header "Authorization: Bearer ${API_KEY}"
|
||||
done
|
||||
else
|
||||
echo "Dry run enabled — no projects were deleted."
|
||||
fi
|
||||
bench:
|
||||
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
|
||||
permissions:
|
||||
|
||||
@@ -16,6 +16,7 @@ pub struct Collector {
|
||||
const NMETRICS: usize = 2;
|
||||
|
||||
static CLK_TCK_F64: Lazy<f64> = Lazy::new(|| {
|
||||
// SAFETY: libc::sysconf is safe, it merely returns a value.
|
||||
let long = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
|
||||
if long == -1 {
|
||||
panic!("sysconf(_SC_CLK_TCK) failed");
|
||||
|
||||
@@ -2469,6 +2469,7 @@ async fn timeline_checkpoint_handler(
|
||||
.map_err(|e|
|
||||
match e {
|
||||
CompactionError::ShuttingDown => ApiError::ShuttingDown,
|
||||
CompactionError::Cancelled => ApiError::ShuttingDown,
|
||||
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
CompactionError::Other(e) => ApiError::InternalServerError(e),
|
||||
|
||||
@@ -3198,6 +3198,7 @@ impl TenantShard {
|
||||
match err {
|
||||
err if err.is_cancel() => {}
|
||||
CompactionError::ShuttingDown => (),
|
||||
CompactionError::Cancelled => (),
|
||||
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
|
||||
// shouldn't block compaction.
|
||||
CompactionError::Offload(_) => {}
|
||||
|
||||
@@ -90,10 +90,18 @@ impl Header {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum BlobWriterError {
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum WriteBlobError {
|
||||
#[error(transparent)]
|
||||
Flush(FlushTaskError),
|
||||
Flush(BlobWriterError),
|
||||
#[error("blob too large ({len} bytes)")]
|
||||
BlobTooLarge { len: usize },
|
||||
#[error(transparent)]
|
||||
@@ -238,14 +246,16 @@ where
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
flush_task_span: tracing::Span,
|
||||
) -> anyhow::Result<Self> {
|
||||
) -> Result<Self, BlobWriterError> {
|
||||
let gate_token = gate.enter().map_err(|_| BlobWriterError::Cancelled)?;
|
||||
|
||||
Ok(Self {
|
||||
io_buf: Some(BytesMut::new()),
|
||||
writer: BufferedWriter::new(
|
||||
file,
|
||||
start_offset,
|
||||
|| IoBufferMut::with_capacity(Self::CAPACITY),
|
||||
gate.enter()?,
|
||||
gate_token,
|
||||
cancel,
|
||||
ctx,
|
||||
flush_task_span,
|
||||
@@ -265,13 +275,16 @@ where
|
||||
&mut self,
|
||||
src_buf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<(), FlushTaskError>) {
|
||||
) -> (FullSlice<Buf>, Result<(), BlobWriterError>) {
|
||||
let res = self
|
||||
.writer
|
||||
// TODO: why are we taking a FullSlice if we're going to pass a borrow downstack?
|
||||
// Can remove all the complexity around owned buffers upstack
|
||||
.write_buffered_borrowed(&src_buf, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
FlushTaskError::Cancelled => BlobWriterError::Cancelled,
|
||||
})
|
||||
.map(|len| {
|
||||
self.offset += len as u64;
|
||||
});
|
||||
@@ -418,8 +431,10 @@ where
|
||||
self,
|
||||
mode: BufferedWriterShutdownMode,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<W, FlushTaskError> {
|
||||
let (_, file) = self.writer.shutdown(mode, ctx).await?;
|
||||
) -> Result<W, BlobWriterError> {
|
||||
let (_, file) = self.writer.shutdown(mode, ctx).await.map_err(|e| match e {
|
||||
FlushTaskError::Cancelled => BlobWriterError::Cancelled,
|
||||
})?;
|
||||
Ok(file)
|
||||
}
|
||||
}
|
||||
@@ -467,8 +482,11 @@ pub(crate) mod tests {
|
||||
.await?,
|
||||
gate.enter()?,
|
||||
);
|
||||
let mut wtr =
|
||||
BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap();
|
||||
let mut wtr = BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test"))
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
BlobWriterError::Other(err) => err,
|
||||
})?;
|
||||
for blob in blobs.iter() {
|
||||
let (_, res) = if compression {
|
||||
let res = wtr
|
||||
@@ -490,7 +508,11 @@ pub(crate) mod tests {
|
||||
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
BlobWriterError::Other(err) => err,
|
||||
})?;
|
||||
file.disarm_into_inner()
|
||||
};
|
||||
Ok((temp_dir, pathbuf, offsets))
|
||||
|
||||
@@ -19,6 +19,14 @@ use crate::context::RequestContext;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum BatchSplitWriterError {
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
pub(crate) enum BatchWriterResult {
|
||||
Produced(ResidentLayer),
|
||||
Discarded(PersistentLayerKey),
|
||||
@@ -97,7 +105,7 @@ impl BatchLayerWriter {
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<ResidentLayer>> {
|
||||
) -> Result<Vec<ResidentLayer>, BatchSplitWriterError> {
|
||||
let res = self
|
||||
.finish_with_discard_fn(tline, ctx, |_| async { false })
|
||||
.await?;
|
||||
@@ -115,7 +123,7 @@ impl BatchLayerWriter {
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
discard_fn: D,
|
||||
) -> anyhow::Result<Vec<BatchWriterResult>>
|
||||
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
|
||||
where
|
||||
D: Fn(&PersistentLayerKey) -> F,
|
||||
F: Future<Output = bool>,
|
||||
@@ -139,14 +147,14 @@ impl BatchLayerWriter {
|
||||
generated_layers.push(BatchWriterResult::Discarded(layer_key));
|
||||
} else {
|
||||
let res = match inner {
|
||||
LayerWriterWrapper::Delta(writer) => {
|
||||
writer.finish(layer_key.key_range.end, ctx).await
|
||||
}
|
||||
LayerWriterWrapper::Image(writer) => {
|
||||
writer
|
||||
.finish_with_end_key(layer_key.key_range.end, ctx)
|
||||
.await
|
||||
}
|
||||
LayerWriterWrapper::Delta(writer) => writer
|
||||
.finish(layer_key.key_range.end, ctx)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))),
|
||||
LayerWriterWrapper::Image(writer) => writer
|
||||
.finish_with_end_key(layer_key.key_range.end, ctx)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))),
|
||||
};
|
||||
let layer = match res {
|
||||
Ok((desc, path)) => {
|
||||
@@ -155,7 +163,7 @@ impl BatchLayerWriter {
|
||||
Err(e) => {
|
||||
tokio::fs::remove_file(&path).await.ok();
|
||||
clean_up_layers(generated_layers);
|
||||
return Err(e);
|
||||
return Err(BatchSplitWriterError::Other(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -235,7 +243,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), BatchSplitWriterError> {
|
||||
// The current estimation is an upper bound of the space that the key/image could take
|
||||
// because we did not consider compression in this estimation. The resulting image layer
|
||||
// could be smaller than the target size.
|
||||
@@ -253,7 +261,8 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
|
||||
self.batches.add_unfinished_image_writer(
|
||||
prev_image_writer,
|
||||
@@ -262,7 +271,10 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
);
|
||||
self.start_key = key;
|
||||
}
|
||||
self.inner.put_image(key, img, ctx).await
|
||||
self.inner
|
||||
.put_image(key, img, ctx)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))
|
||||
}
|
||||
|
||||
pub(crate) async fn finish_with_discard_fn<D, F>(
|
||||
@@ -271,7 +283,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
discard_fn: D,
|
||||
) -> anyhow::Result<Vec<BatchWriterResult>>
|
||||
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
|
||||
where
|
||||
D: Fn(&PersistentLayerKey) -> F,
|
||||
F: Future<Output = bool>,
|
||||
@@ -291,7 +303,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
) -> anyhow::Result<Vec<BatchWriterResult>> {
|
||||
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError> {
|
||||
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
|
||||
.await
|
||||
}
|
||||
@@ -346,7 +358,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), BatchSplitWriterError> {
|
||||
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
|
||||
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
|
||||
//
|
||||
@@ -366,7 +378,8 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?,
|
||||
));
|
||||
}
|
||||
let (_, inner) = self.inner.as_mut().unwrap();
|
||||
@@ -386,7 +399,8 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
let (start_key, prev_delta_writer) =
|
||||
self.inner.replace((key, next_delta_writer)).unwrap();
|
||||
self.batches.add_unfinished_delta_writer(
|
||||
@@ -396,16 +410,19 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
);
|
||||
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
|
||||
// We have to produce a very large file b/c a key is updated too often.
|
||||
anyhow::bail!(
|
||||
return Err(BatchSplitWriterError::Other(anyhow::anyhow!(
|
||||
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
|
||||
key,
|
||||
inner.estimated_size()
|
||||
);
|
||||
)));
|
||||
}
|
||||
}
|
||||
self.last_key_written = key;
|
||||
let (_, inner) = self.inner.as_mut().unwrap();
|
||||
inner.put_value(key, lsn, val, ctx).await
|
||||
inner
|
||||
.put_value(key, lsn, val, ctx)
|
||||
.await
|
||||
.map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))
|
||||
}
|
||||
|
||||
pub(crate) async fn finish_with_discard_fn<D, F>(
|
||||
@@ -413,7 +430,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
discard_fn: D,
|
||||
) -> anyhow::Result<Vec<BatchWriterResult>>
|
||||
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError>
|
||||
where
|
||||
D: Fn(&PersistentLayerKey) -> F,
|
||||
F: Future<Output = bool>,
|
||||
@@ -439,7 +456,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<BatchWriterResult>> {
|
||||
) -> Result<Vec<BatchWriterResult>, BatchSplitWriterError> {
|
||||
self.finish_with_discard_fn(tline, ctx, |_| async { false })
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -35,7 +35,9 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use anyhow::{Context, Result, bail, ensure};
|
||||
use crate::tenant::blob_io::BlobWriterError;
|
||||
|
||||
use anyhow::{Context, Result, bail};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
@@ -76,7 +78,7 @@ use crate::tenant::vectored_blob_io::{
|
||||
use crate::virtual_file::TempVirtualFile;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
|
||||
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, VirtualFile};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
|
||||
///
|
||||
@@ -448,7 +450,11 @@ impl DeltaLayerWriterInner {
|
||||
cancel,
|
||||
ctx,
|
||||
info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
|
||||
)?;
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
BlobWriterError::Other(err) => err,
|
||||
})?;
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -477,15 +483,12 @@ impl DeltaLayerWriterInner {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), DeltaLayerWriterError> {
|
||||
let val_ser =
|
||||
Value::ser(&val).map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
|
||||
let (_, res) = self
|
||||
.put_value_bytes(
|
||||
key,
|
||||
lsn,
|
||||
Value::ser(&val)?.slice_len(),
|
||||
val.will_init(),
|
||||
ctx,
|
||||
)
|
||||
.put_value_bytes(key, lsn, val_ser.slice_len(), val.will_init(), ctx)
|
||||
.await;
|
||||
res
|
||||
}
|
||||
@@ -497,25 +500,46 @@ impl DeltaLayerWriterInner {
|
||||
val: FullSlice<Buf>,
|
||||
will_init: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, anyhow::Result<()>)
|
||||
) -> (FullSlice<Buf>, Result<(), DeltaLayerWriterError>)
|
||||
where
|
||||
Buf: IoBuf + Send,
|
||||
{
|
||||
assert!(
|
||||
self.lsn_range.start <= lsn,
|
||||
"lsn_start={}, lsn={}",
|
||||
self.lsn_range.start,
|
||||
lsn
|
||||
);
|
||||
if self.lsn_range.start > lsn {
|
||||
return (
|
||||
val,
|
||||
Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
|
||||
"lsn_start={}, lsn={}",
|
||||
self.lsn_range.start,
|
||||
lsn
|
||||
))),
|
||||
);
|
||||
}
|
||||
|
||||
// We don't want to use compression in delta layer creation
|
||||
let compression = ImageCompressionAlgorithm::Disabled;
|
||||
let (val, res) = self
|
||||
.blob_writer
|
||||
.write_blob_maybe_compressed(val, ctx, compression)
|
||||
.await;
|
||||
|
||||
let off = match res {
|
||||
Ok((off, _)) => off,
|
||||
Err(e) => return (val, Err(anyhow::anyhow!(e))),
|
||||
Err(e) => {
|
||||
return (
|
||||
val,
|
||||
Err(match e {
|
||||
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
|
||||
crate::tenant::blob_io::BlobWriterError::Cancelled => {
|
||||
DeltaLayerWriterError::Cancelled
|
||||
}
|
||||
crate::tenant::blob_io::BlobWriterError::Other(err) => {
|
||||
DeltaLayerWriterError::Other(err)
|
||||
}
|
||||
},
|
||||
other => DeltaLayerWriterError::Other(anyhow::anyhow!(other)),
|
||||
}),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let blob_ref = BlobRef::new(off, will_init);
|
||||
@@ -525,7 +549,10 @@ impl DeltaLayerWriterInner {
|
||||
|
||||
self.num_keys += 1;
|
||||
|
||||
(val, res.map_err(|e| anyhow::anyhow!(e)))
|
||||
(
|
||||
val,
|
||||
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e))),
|
||||
)
|
||||
}
|
||||
|
||||
fn size(&self) -> u64 {
|
||||
@@ -539,7 +566,7 @@ impl DeltaLayerWriterInner {
|
||||
self,
|
||||
key_end: Key,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> {
|
||||
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
|
||||
|
||||
let file = self
|
||||
@@ -548,17 +575,24 @@ impl DeltaLayerWriterInner {
|
||||
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => DeltaLayerWriterError::Cancelled,
|
||||
BlobWriterError::Other(err) => DeltaLayerWriterError::Other(err),
|
||||
})?;
|
||||
|
||||
// Write out the index
|
||||
let (index_root_blk, block_buf) = self.tree.finish()?;
|
||||
let (index_root_blk, block_buf) = self
|
||||
.tree
|
||||
.finish()
|
||||
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
|
||||
// Should we just replace BlockBuf::blocks with one big buffer
|
||||
for buf in block_buf.blocks {
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
|
||||
res?;
|
||||
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
offset += PAGE_SZ as u64;
|
||||
}
|
||||
assert!(self.lsn_range.start < self.lsn_range.end);
|
||||
@@ -575,24 +609,27 @@ impl DeltaLayerWriterInner {
|
||||
};
|
||||
|
||||
// Writes summary at the first block (offset 0).
|
||||
let buf = summary.ser_into_page()?;
|
||||
let buf = summary
|
||||
.ser_into_page()
|
||||
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
|
||||
res?;
|
||||
res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
|
||||
let metadata = file
|
||||
.metadata()
|
||||
.await
|
||||
.context("get file metadata to determine size")?;
|
||||
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
|
||||
// 5GB limit for objects without multipart upload (which we don't want to use)
|
||||
// Make it a little bit below to account for differing GB units
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
|
||||
ensure!(
|
||||
metadata.len() <= S3_UPLOAD_LIMIT,
|
||||
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
|
||||
file.path(),
|
||||
metadata.len()
|
||||
);
|
||||
if metadata.len() > S3_UPLOAD_LIMIT {
|
||||
return Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
|
||||
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
|
||||
file.path(),
|
||||
metadata.len()
|
||||
)));
|
||||
}
|
||||
|
||||
// Note: Because we opened the file in write-only mode, we cannot
|
||||
// reuse the same VirtualFile for reading later. That's why we don't
|
||||
@@ -609,7 +646,7 @@ impl DeltaLayerWriterInner {
|
||||
// fsync the file
|
||||
file.sync_all()
|
||||
.await
|
||||
.maybe_fatal_err("delta_layer sync_all")?;
|
||||
.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?;
|
||||
|
||||
trace!("created delta layer {}", self.path);
|
||||
|
||||
@@ -694,7 +731,7 @@ impl DeltaLayerWriter {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), DeltaLayerWriterError> {
|
||||
self.inner
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
@@ -709,7 +746,7 @@ impl DeltaLayerWriter {
|
||||
val: FullSlice<Buf>,
|
||||
will_init: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, anyhow::Result<()>)
|
||||
) -> (FullSlice<Buf>, Result<(), DeltaLayerWriterError>)
|
||||
where
|
||||
Buf: IoBuf + Send,
|
||||
{
|
||||
@@ -731,7 +768,7 @@ impl DeltaLayerWriter {
|
||||
mut self,
|
||||
key_end: Key,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> {
|
||||
self.inner.take().unwrap().finish(key_end, ctx).await
|
||||
}
|
||||
|
||||
@@ -745,6 +782,14 @@ impl DeltaLayerWriter {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DeltaLayerWriterError {
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RewriteSummaryError {
|
||||
#[error("magic mismatch")]
|
||||
@@ -755,7 +800,7 @@ pub enum RewriteSummaryError {
|
||||
|
||||
impl From<std::io::Error> for RewriteSummaryError {
|
||||
fn from(e: std::io::Error) -> Self {
|
||||
Self::Other(anyhow::anyhow!(e))
|
||||
Self::Other(anyhow::Error::new(e))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -33,7 +33,9 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use anyhow::{Context, Result, bail, ensure};
|
||||
use crate::tenant::blob_io::BlobWriterError;
|
||||
|
||||
use anyhow::{Context, Result, bail};
|
||||
use bytes::Bytes;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use hex;
|
||||
@@ -74,7 +76,7 @@ use crate::tenant::vectored_blob_io::{
|
||||
use crate::virtual_file::TempVirtualFile;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
|
||||
///
|
||||
@@ -340,6 +342,14 @@ impl ImageLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ImageLayerWriterError {
|
||||
#[error("flush task cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RewriteSummaryError {
|
||||
#[error("magic mismatch")]
|
||||
@@ -814,7 +824,11 @@ impl ImageLayerWriterInner {
|
||||
cancel,
|
||||
ctx,
|
||||
info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
|
||||
)?;
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
BlobWriterError::Other(err) => err,
|
||||
})?;
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -850,8 +864,13 @@ impl ImageLayerWriterInner {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
) -> Result<(), ImageLayerWriterError> {
|
||||
if !self.key_range.contains(&key) {
|
||||
return Err(ImageLayerWriterError::Other(anyhow::anyhow!(
|
||||
"key not in range"
|
||||
)));
|
||||
}
|
||||
|
||||
let compression = self.conf.image_compression;
|
||||
let uncompressed_len = img.len() as u64;
|
||||
self.uncompressed_bytes += uncompressed_len;
|
||||
@@ -861,7 +880,18 @@ impl ImageLayerWriterInner {
|
||||
.write_blob_maybe_compressed(img.slice_len(), ctx, compression)
|
||||
.await;
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
let (off, compression_info) = res?;
|
||||
let (off, compression_info) = res.map_err(|e| match e {
|
||||
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
|
||||
crate::tenant::blob_io::BlobWriterError::Cancelled => {
|
||||
ImageLayerWriterError::Cancelled
|
||||
}
|
||||
crate::tenant::blob_io::BlobWriterError::Other(err) => {
|
||||
ImageLayerWriterError::Other(err)
|
||||
}
|
||||
},
|
||||
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
|
||||
if compression_info.compressed_size.is_some() {
|
||||
// The image has been considered for compression at least
|
||||
self.uncompressed_bytes_eligible += uncompressed_len;
|
||||
@@ -873,7 +903,9 @@ impl ImageLayerWriterInner {
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree.append(&keybuf, off)?;
|
||||
self.tree
|
||||
.append(&keybuf, off)
|
||||
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
@@ -893,8 +925,12 @@ impl ImageLayerWriterInner {
|
||||
key: Key,
|
||||
raw_with_header: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
) -> Result<(), ImageLayerWriterError> {
|
||||
if !self.key_range.contains(&key) {
|
||||
return Err(ImageLayerWriterError::Other(anyhow::anyhow!(
|
||||
"key not in range"
|
||||
)));
|
||||
}
|
||||
|
||||
// NB: we don't update the (un)compressed metrics, since we can't determine them without
|
||||
// decompressing the image. This seems okay.
|
||||
@@ -904,11 +940,23 @@ impl ImageLayerWriterInner {
|
||||
.blob_writer
|
||||
.write_blob_raw(raw_with_header.slice_len(), ctx)
|
||||
.await;
|
||||
let offset = res?;
|
||||
let offset = res.map_err(|e| match e {
|
||||
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
|
||||
crate::tenant::blob_io::BlobWriterError::Cancelled => {
|
||||
ImageLayerWriterError::Cancelled
|
||||
}
|
||||
crate::tenant::blob_io::BlobWriterError::Other(err) => {
|
||||
ImageLayerWriterError::Other(err)
|
||||
}
|
||||
},
|
||||
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree.append(&keybuf, offset)?;
|
||||
self.tree
|
||||
.append(&keybuf, offset)
|
||||
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
@@ -925,7 +973,7 @@ impl ImageLayerWriterInner {
|
||||
self,
|
||||
ctx: &RequestContext,
|
||||
end_key: Option<Key>,
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
|
||||
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
|
||||
|
||||
// Calculate compression ratio
|
||||
@@ -948,17 +996,24 @@ impl ImageLayerWriterInner {
|
||||
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled,
|
||||
BlobWriterError::Other(err) => ImageLayerWriterError::Other(err),
|
||||
})?;
|
||||
|
||||
// Write out the index
|
||||
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
|
||||
let (index_root_blk, block_buf) = self.tree.finish()?;
|
||||
let (index_root_blk, block_buf) = self
|
||||
.tree
|
||||
.finish()
|
||||
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
|
||||
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
|
||||
// Should we just replace BlockBuf::blocks with one big buffer?
|
||||
for buf in block_buf.blocks {
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
|
||||
res?;
|
||||
res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
offset += PAGE_SZ as u64;
|
||||
}
|
||||
|
||||
@@ -981,14 +1036,18 @@ impl ImageLayerWriterInner {
|
||||
};
|
||||
|
||||
// Writes summary at the first block (offset 0).
|
||||
let buf = summary.ser_into_page()?;
|
||||
let buf = summary
|
||||
.ser_into_page()
|
||||
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
|
||||
res?;
|
||||
res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
|
||||
let metadata = file
|
||||
.metadata()
|
||||
.await
|
||||
.context("get metadata to determine file size")?;
|
||||
let metadata = file.metadata().await.map_err(|e| {
|
||||
ImageLayerWriterError::Other(anyhow::anyhow!(
|
||||
"get metadata to determine file size: {}",
|
||||
e
|
||||
))
|
||||
})?;
|
||||
|
||||
let desc = PersistentLayerDesc::new_img(
|
||||
self.tenant_shard_id,
|
||||
@@ -1011,9 +1070,9 @@ impl ImageLayerWriterInner {
|
||||
// set inner.file here. The first read will have to re-open it.
|
||||
|
||||
// fsync the file
|
||||
file.sync_all()
|
||||
.await
|
||||
.maybe_fatal_err("image_layer sync_all")?;
|
||||
file.sync_all().await.map_err(|e| {
|
||||
ImageLayerWriterError::Other(anyhow::anyhow!("image_layer sync_all: {}", e))
|
||||
})?;
|
||||
|
||||
trace!("created image layer {}", self.path);
|
||||
|
||||
@@ -1093,7 +1152,7 @@ impl ImageLayerWriter {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), ImageLayerWriterError> {
|
||||
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
@@ -1108,7 +1167,7 @@ impl ImageLayerWriter {
|
||||
key: Key,
|
||||
raw_with_header: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), ImageLayerWriterError> {
|
||||
self.inner
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
@@ -1132,7 +1191,7 @@ impl ImageLayerWriter {
|
||||
pub(crate) async fn finish(
|
||||
mut self,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
|
||||
self.inner.take().unwrap().finish(ctx, None).await
|
||||
}
|
||||
|
||||
@@ -1141,7 +1200,7 @@ impl ImageLayerWriter {
|
||||
mut self,
|
||||
end_key: Key,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> {
|
||||
self.inner.take().unwrap().finish(ctx, Some(end_key)).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ use utils::lsn::Lsn;
|
||||
use utils::vec_map::VecMap;
|
||||
use wal_decoder::serialized_batch::{SerializedValueBatch, SerializedValueMeta, ValueMeta};
|
||||
|
||||
use super::delta_layer::DeltaLayerWriterError;
|
||||
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
|
||||
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64, u64_to_usize};
|
||||
use crate::config::PageServerConf;
|
||||
@@ -581,7 +582,17 @@ impl InMemoryLayer {
|
||||
estimated_in_mem_size: AtomicU64::new(0),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum InMemoryLayerError {
|
||||
#[error("flush task cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl InMemoryLayer {
|
||||
/// Write path.
|
||||
///
|
||||
/// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
|
||||
@@ -591,7 +602,7 @@ impl InMemoryLayer {
|
||||
&self,
|
||||
serialized_batch: SerializedValueBatch,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), InMemoryLayerError> {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
|
||||
@@ -605,7 +616,11 @@ impl InMemoryLayer {
|
||||
} = serialized_batch;
|
||||
|
||||
// Write the batch to the file
|
||||
inner.file.write_raw(&raw, ctx).await?;
|
||||
inner
|
||||
.file
|
||||
.write_raw(&raw, ctx)
|
||||
.await
|
||||
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
|
||||
let new_size = inner.file.len();
|
||||
|
||||
let expected_new_len = base_offset
|
||||
@@ -637,7 +652,8 @@ impl InMemoryLayer {
|
||||
batch_offset,
|
||||
len,
|
||||
will_init,
|
||||
})?;
|
||||
})
|
||||
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
|
||||
|
||||
let vec_map = inner.index.entry(key).or_default();
|
||||
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
|
||||
@@ -794,14 +810,25 @@ impl InMemoryLayer {
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
res?;
|
||||
res.map_err(|e| match e {
|
||||
DeltaLayerWriterError::Cancelled => {
|
||||
anyhow::anyhow!("flush task cancelled")
|
||||
}
|
||||
DeltaLayerWriterError::Other(err) => err,
|
||||
})?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MAX is used here because we identify L0 layers by full key range
|
||||
let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
|
||||
let (desc, path) = delta_layer_writer
|
||||
.finish(Key::MAX, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
DeltaLayerWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
DeltaLayerWriterError::Other(err) => err,
|
||||
})?;
|
||||
|
||||
// Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
|
||||
//
|
||||
|
||||
@@ -300,6 +300,7 @@ pub(crate) fn log_compaction_error(
|
||||
let level = match err {
|
||||
e if e.is_cancel() => return,
|
||||
ShuttingDown => return,
|
||||
Cancelled => return,
|
||||
Offload(_) => Level::ERROR,
|
||||
AlreadyRunning(_) => Level::ERROR,
|
||||
CollectKeySpaceError(_) => Level::ERROR,
|
||||
|
||||
@@ -119,6 +119,8 @@ use crate::tenant::gc_result::GcResult;
|
||||
use crate::tenant::layer_map::LayerMap;
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
|
||||
use crate::tenant::storage_layer::image_layer::ImageLayerWriterError;
|
||||
use crate::tenant::storage_layer::inmemory_layer::InMemoryLayerError;
|
||||
use crate::tenant::storage_layer::inmemory_layer::IndexEntry;
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc, BatchLayerWriter, DeltaLayerWriter, EvictionError, ImageLayerName,
|
||||
@@ -773,6 +775,21 @@ impl From<layer_manager::Shutdown> for CreateImageLayersError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError>
|
||||
for CreateImageLayersError
|
||||
{
|
||||
fn from(err: crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError) -> Self {
|
||||
match err {
|
||||
crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError::Cancelled => {
|
||||
Self::Cancelled
|
||||
}
|
||||
crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError::Other(err) => {
|
||||
Self::Other(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug, Clone)]
|
||||
pub(crate) enum FlushLayerError {
|
||||
/// Timeline cancellation token was cancelled
|
||||
@@ -2041,6 +2058,9 @@ impl Timeline {
|
||||
Err(CompactionError::ShuttingDown) => {
|
||||
// Covered by the `Err(e) if e.is_cancel()` branch.
|
||||
}
|
||||
Err(CompactionError::Cancelled) => {
|
||||
// Covered by the `Err(e) if e.is_cancel()` branch.
|
||||
}
|
||||
Err(CompactionError::AlreadyRunning(_)) => {
|
||||
// Covered by the `Err(e) if e.is_cancel()` branch.
|
||||
}
|
||||
@@ -5232,7 +5252,17 @@ impl Timeline {
|
||||
};
|
||||
|
||||
// Write all the keys we just read into our new image layer.
|
||||
image_layer_writer.put_image(img_key, img, ctx).await?;
|
||||
image_layer_writer
|
||||
.put_image(img_key, img, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
ImageLayerWriterError::Cancelled => CreateImageLayersError::Other(
|
||||
anyhow::anyhow!("flush task cancelled"),
|
||||
),
|
||||
ImageLayerWriterError::Other(err) => {
|
||||
CreateImageLayersError::Other(err)
|
||||
}
|
||||
})?;
|
||||
wrote_keys = true;
|
||||
}
|
||||
}
|
||||
@@ -5329,7 +5359,15 @@ impl Timeline {
|
||||
|
||||
// TODO: split image layers to avoid too large layer files. Too large image files are not handled
|
||||
// on the normal data path either.
|
||||
image_layer_writer.put_image(k, v, ctx).await?;
|
||||
image_layer_writer
|
||||
.put_image(k, v, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
ImageLayerWriterError::Cancelled => {
|
||||
CreateImageLayersError::Other(anyhow::anyhow!("flush task cancelled"))
|
||||
}
|
||||
ImageLayerWriterError::Other(err) => CreateImageLayersError::Other(err),
|
||||
})?;
|
||||
}
|
||||
|
||||
if wrote_any_image {
|
||||
@@ -5843,6 +5881,8 @@ pub(crate) enum CompactionError {
|
||||
Other(anyhow::Error),
|
||||
#[error("Compaction already running: {0}")]
|
||||
AlreadyRunning(&'static str),
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl CompactionError {
|
||||
@@ -5857,6 +5897,7 @@ impl CompactionError {
|
||||
PageReconstructError::Cancelled
|
||||
))
|
||||
| Self::Offload(OffloadError::Cancelled)
|
||||
| Self::Cancelled
|
||||
)
|
||||
}
|
||||
|
||||
@@ -6922,9 +6963,22 @@ impl Timeline {
|
||||
)
|
||||
.await?;
|
||||
for (key, img) in images {
|
||||
image_layer_writer.put_image(key, img, ctx).await?;
|
||||
image_layer_writer
|
||||
.put_image(key, img, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
ImageLayerWriterError::Cancelled => {
|
||||
anyhow::anyhow!("flush task cancelled")
|
||||
}
|
||||
ImageLayerWriterError::Other(err) => err,
|
||||
})?;
|
||||
}
|
||||
let (desc, path) = image_layer_writer.finish(ctx).await?;
|
||||
let (desc, path) = image_layer_writer.finish(ctx).await.map_err(|e| match e {
|
||||
ImageLayerWriterError::Cancelled => {
|
||||
anyhow::anyhow!("flush task cancelled")
|
||||
}
|
||||
ImageLayerWriterError::Other(err) => err,
|
||||
})?;
|
||||
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
|
||||
info!("force created image layer {}", image_layer.local_path());
|
||||
{
|
||||
@@ -7378,7 +7432,10 @@ impl TimelineWriter<'_> {
|
||||
state.max_lsn = std::cmp::max(state.max_lsn, Some(batch_max_lsn));
|
||||
}
|
||||
|
||||
res
|
||||
res.map_err(|e| match e {
|
||||
InMemoryLayerError::Cancelled => anyhow::anyhow!("flush task cancelled"),
|
||||
InMemoryLayerError::Other(err) => err,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1547,7 +1547,7 @@ impl Timeline {
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
|
||||
// Safety of layer rewrites:
|
||||
// - We are writing to a different local file path than we are reading from, so the old Layer
|
||||
@@ -1572,7 +1572,7 @@ impl Timeline {
|
||||
let (desc, path) = image_layer_writer
|
||||
.finish(ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
let new_layer = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.map_err(CompactionError::Other)?;
|
||||
info!(layer=%new_layer, "rewrote layer, {} -> {} bytes",
|
||||
@@ -2140,7 +2140,7 @@ impl Timeline {
|
||||
.unwrap()
|
||||
.finish(prev_key.unwrap().next(), ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.map_err(CompactionError::Other)?;
|
||||
|
||||
@@ -2199,7 +2199,7 @@ impl Timeline {
|
||||
.unwrap()
|
||||
.put_value(key, lsn, value, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
} else {
|
||||
let owner = self.shard_identity.get_shard_number(&key);
|
||||
|
||||
@@ -2217,7 +2217,7 @@ impl Timeline {
|
||||
let (desc, path) = writer
|
||||
.finish(prev_key.unwrap().next(), ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.map_err(CompactionError::Other)?;
|
||||
new_layers.push(new_delta);
|
||||
@@ -3682,8 +3682,7 @@ impl Timeline {
|
||||
let (desc, path) = delta_writer_before
|
||||
.finish(job_desc.compaction_key_range.start, ctx)
|
||||
.await
|
||||
.context("failed to finish delta layer writer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
let layer = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.context("failed to finish creating delta layer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
@@ -3693,8 +3692,7 @@ impl Timeline {
|
||||
let (desc, path) = delta_writer_after
|
||||
.finish(key.key_range.end, ctx)
|
||||
.await
|
||||
.context("failed to finish delta layer writer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?;
|
||||
let layer = Layer::finish_creating(self.conf, self, desc, &path)
|
||||
.context("failed to finish creating delta layer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
@@ -3713,8 +3711,7 @@ impl Timeline {
|
||||
writer
|
||||
.finish_with_discard_fn(self, ctx, end_key, discard)
|
||||
.await
|
||||
.context("failed to finish image layer writer")
|
||||
.map_err(CompactionError::Other)?
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?
|
||||
} else {
|
||||
drop(writer);
|
||||
Vec::new()
|
||||
@@ -3727,8 +3724,7 @@ impl Timeline {
|
||||
delta_layer_writer
|
||||
.finish_with_discard_fn(self, ctx, discard)
|
||||
.await
|
||||
.context("failed to finish delta layer writer")
|
||||
.map_err(CompactionError::Other)?
|
||||
.map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?
|
||||
} else {
|
||||
drop(delta_layer_writer);
|
||||
Vec::new()
|
||||
@@ -4253,7 +4249,10 @@ impl TimelineAdaptor {
|
||||
unfinished_image_layer,
|
||||
} = outcome
|
||||
{
|
||||
let (desc, path) = unfinished_image_layer.finish(ctx).await?;
|
||||
let (desc, path) = unfinished_image_layer
|
||||
.finish(ctx)
|
||||
.await
|
||||
.map_err(|e| CreateImageLayersError::Other(anyhow::anyhow!(e)))?;
|
||||
let image_layer =
|
||||
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
|
||||
self.new_images.push(image_layer);
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError;
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use http_utils::error::ApiError;
|
||||
@@ -816,7 +817,10 @@ async fn copy_lsn_prefix(
|
||||
let (desc, path) = writer
|
||||
.finish(reused_highest_key, ctx)
|
||||
.await
|
||||
.map_err(Error::Prepare)?;
|
||||
.map_err(|e| match e {
|
||||
DeltaLayerWriterError::Cancelled => Error::ShuttingDown,
|
||||
DeltaLayerWriterError::Other(err) => Error::Prepare(err),
|
||||
})?;
|
||||
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
|
||||
.map_err(Error::Prepare)?;
|
||||
|
||||
|
||||
@@ -14,8 +14,6 @@
|
||||
use std::fs::File;
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||
#[cfg(target_os = "linux")]
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
use std::sync::LazyLock;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
|
||||
|
||||
@@ -99,7 +97,7 @@ impl VirtualFile {
|
||||
|
||||
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
open_options: &OpenOptions,
|
||||
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut open_options: OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
let mode = get_io_mode();
|
||||
@@ -112,21 +110,16 @@ impl VirtualFile {
|
||||
#[cfg(target_os = "linux")]
|
||||
(IoMode::DirectRw, _) => true,
|
||||
};
|
||||
let open_options = open_options.clone();
|
||||
let open_options = if set_o_direct {
|
||||
if set_o_direct {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
let mut open_options = open_options;
|
||||
open_options.custom_flags(nix::libc::O_DIRECT);
|
||||
open_options
|
||||
open_options = open_options.custom_flags(nix::libc::O_DIRECT);
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
unreachable!(
|
||||
"O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined"
|
||||
);
|
||||
} else {
|
||||
open_options
|
||||
};
|
||||
}
|
||||
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
|
||||
Ok(VirtualFile { inner, _mode: mode })
|
||||
}
|
||||
@@ -530,7 +523,7 @@ impl VirtualFileInner {
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<VirtualFileInner, std::io::Error> {
|
||||
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true).clone(), ctx).await
|
||||
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
|
||||
}
|
||||
|
||||
/// Open a file with given options.
|
||||
@@ -558,10 +551,11 @@ impl VirtualFileInner {
|
||||
// It would perhaps be nicer to check just for the read and write flags
|
||||
// explicitly, but OpenOptions doesn't contain any functions to read flags,
|
||||
// only to set them.
|
||||
let mut reopen_options = open_options.clone();
|
||||
reopen_options.create(false);
|
||||
reopen_options.create_new(false);
|
||||
reopen_options.truncate(false);
|
||||
let reopen_options = open_options
|
||||
.clone()
|
||||
.create(false)
|
||||
.create_new(false)
|
||||
.truncate(false);
|
||||
|
||||
let vfile = VirtualFileInner {
|
||||
handle: RwLock::new(handle),
|
||||
@@ -1307,7 +1301,7 @@ mod tests {
|
||||
opts: OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<MaybeVirtualFile, anyhow::Error> {
|
||||
let vf = VirtualFile::open_with_options_v2(&path, &opts, ctx).await?;
|
||||
let vf = VirtualFile::open_with_options_v2(&path, opts, ctx).await?;
|
||||
Ok(MaybeVirtualFile::VirtualFile(vf))
|
||||
}
|
||||
}
|
||||
@@ -1374,7 +1368,7 @@ mod tests {
|
||||
let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
|
||||
|
||||
// Close the file and re-open for reading
|
||||
let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
|
||||
let mut file_a = A::open(path_a, OpenOptions::new().read(true), &ctx).await?;
|
||||
|
||||
// cannot write to a file opened in read-only mode
|
||||
let _ = file_a
|
||||
@@ -1393,8 +1387,7 @@ mod tests {
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.to_owned(),
|
||||
.truncate(true),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -1412,12 +1405,7 @@ mod tests {
|
||||
|
||||
let mut vfiles = Vec::new();
|
||||
for _ in 0..100 {
|
||||
let mut vfile = A::open(
|
||||
path_b.clone(),
|
||||
OpenOptions::new().read(true).to_owned(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
let mut vfile = A::open(path_b.clone(), OpenOptions::new().read(true), &ctx).await?;
|
||||
assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
|
||||
vfiles.push(vfile);
|
||||
}
|
||||
@@ -1466,7 +1454,7 @@ mod tests {
|
||||
for _ in 0..VIRTUAL_FILES {
|
||||
let f = VirtualFileInner::open_with_options(
|
||||
&test_file_path,
|
||||
OpenOptions::new().read(true).clone(),
|
||||
OpenOptions::new().read(true),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
|
||||
|
||||
use std::os::fd::OwnedFd;
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
use std::path::Path;
|
||||
|
||||
use super::io_engine::IoEngine;
|
||||
@@ -43,7 +44,7 @@ impl OpenOptions {
|
||||
self.write
|
||||
}
|
||||
|
||||
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
|
||||
pub fn read(mut self, read: bool) -> Self {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.read(read);
|
||||
@@ -56,7 +57,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
|
||||
pub fn write(mut self, write: bool) -> Self {
|
||||
self.write = write;
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
@@ -70,7 +71,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
|
||||
pub fn create(mut self, create: bool) -> Self {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.create(create);
|
||||
@@ -83,7 +84,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
|
||||
pub fn create_new(mut self, create_new: bool) -> Self {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.create_new(create_new);
|
||||
@@ -96,7 +97,7 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
|
||||
pub fn truncate(mut self, truncate: bool) -> Self {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.truncate(truncate);
|
||||
@@ -124,10 +125,8 @@ impl OpenOptions {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
|
||||
fn mode(&mut self, mode: u32) -> &mut OpenOptions {
|
||||
pub fn mode(mut self, mode: u32) -> Self {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.mode(mode);
|
||||
@@ -140,7 +139,7 @@ impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
|
||||
pub fn custom_flags(mut self, flags: i32) -> Self {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.custom_flags(flags);
|
||||
|
||||
@@ -150,7 +150,7 @@ NeonWALReaderFree(NeonWALReader *state)
|
||||
* fetched from timeline 'tli'.
|
||||
*
|
||||
* Returns NEON_WALREAD_SUCCESS if succeeded, NEON_WALREAD_ERROR if an error
|
||||
* occurs, in which case 'err' has the desciption. Error always closes remote
|
||||
* occurs, in which case 'err' has the description. Error always closes remote
|
||||
* connection, if there was any, so socket subscription should be removed.
|
||||
*
|
||||
* NEON_WALREAD_WOULDBLOCK means caller should obtain socket to wait for with
|
||||
|
||||
@@ -1989,8 +1989,14 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
/*
|
||||
* We have to disable this check for pg14-16 because sorted build of GIST index requires
|
||||
* to perform unlogged build several times
|
||||
*/
|
||||
if (smgrnblocks(reln, MAIN_FORKNUM) != 0)
|
||||
neon_log(ERROR, "cannot perform unlogged index build, index is not empty ");
|
||||
#endif
|
||||
|
||||
unlogged_build_rel = reln;
|
||||
unlogged_build_phase = UNLOGGED_BUILD_PHASE_1;
|
||||
|
||||
@@ -124,6 +124,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
}
|
||||
else
|
||||
{
|
||||
wp->safekeepers_generation = INVALID_GENERATION;
|
||||
host = wp->config->safekeepers_list;
|
||||
}
|
||||
wp_log(LOG, "safekeepers_generation=%u", wp->safekeepers_generation);
|
||||
@@ -756,7 +757,7 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
|
||||
{
|
||||
SafekeeperId *sk_id = &wp->mconf.members.m[i];
|
||||
|
||||
if (wp->mconf.members.m[i].node_id == sk->greetResponse.nodeId)
|
||||
if (sk_id->node_id == sk->greetResponse.nodeId)
|
||||
{
|
||||
/*
|
||||
* If mconf or list of safekeepers to connect to changed (the
|
||||
@@ -781,7 +782,7 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
|
||||
{
|
||||
SafekeeperId *sk_id = &wp->mconf.new_members.m[i];
|
||||
|
||||
if (wp->mconf.new_members.m[i].node_id == sk->greetResponse.nodeId)
|
||||
if (sk_id->node_id == sk->greetResponse.nodeId)
|
||||
{
|
||||
if (wp->new_members_safekeepers[i] != NULL && wp->new_members_safekeepers[i] != sk)
|
||||
{
|
||||
@@ -1071,7 +1072,6 @@ RecvVoteResponse(Safekeeper *sk)
|
||||
/* ready for elected message */
|
||||
sk->state = SS_WAIT_ELECTED;
|
||||
|
||||
wp->n_votes++;
|
||||
/* Are we already elected? */
|
||||
if (wp->state == WPS_CAMPAIGN)
|
||||
{
|
||||
|
||||
@@ -845,9 +845,6 @@ typedef struct WalProposer
|
||||
/* timeline globally starts at this LSN */
|
||||
XLogRecPtr timelineStartLsn;
|
||||
|
||||
/* number of votes collected from safekeepers */
|
||||
int n_votes;
|
||||
|
||||
/* number of successful connections over the lifetime of walproposer */
|
||||
int n_connected;
|
||||
|
||||
|
||||
@@ -409,14 +409,22 @@ impl JwkCacheEntryLock {
|
||||
|
||||
if let Some(exp) = payload.expiration {
|
||||
if now >= exp + CLOCK_SKEW_LEEWAY {
|
||||
return Err(JwtError::InvalidClaims(JwtClaimsError::JwtTokenHasExpired));
|
||||
return Err(JwtError::InvalidClaims(JwtClaimsError::JwtTokenHasExpired(
|
||||
exp.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(nbf) = payload.not_before {
|
||||
if nbf >= now + CLOCK_SKEW_LEEWAY {
|
||||
return Err(JwtError::InvalidClaims(
|
||||
JwtClaimsError::JwtTokenNotYetReadyToUse,
|
||||
JwtClaimsError::JwtTokenNotYetReadyToUse(
|
||||
nbf.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs(),
|
||||
),
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -534,10 +542,10 @@ struct JwtPayload<'a> {
|
||||
#[serde(rename = "aud", default)]
|
||||
audience: OneOrMany,
|
||||
/// Expiration - Time after which the JWT expires
|
||||
#[serde(deserialize_with = "numeric_date_opt", rename = "exp", default)]
|
||||
#[serde(rename = "exp", deserialize_with = "numeric_date_opt", default)]
|
||||
expiration: Option<SystemTime>,
|
||||
/// Not before - Time after which the JWT expires
|
||||
#[serde(deserialize_with = "numeric_date_opt", rename = "nbf", default)]
|
||||
/// Not before - Time before which the JWT is not valid
|
||||
#[serde(rename = "nbf", deserialize_with = "numeric_date_opt", default)]
|
||||
not_before: Option<SystemTime>,
|
||||
|
||||
// the following entries are only extracted for the sake of debug logging.
|
||||
@@ -609,8 +617,15 @@ impl<'de> Deserialize<'de> for OneOrMany {
|
||||
}
|
||||
|
||||
fn numeric_date_opt<'de, D: Deserializer<'de>>(d: D) -> Result<Option<SystemTime>, D::Error> {
|
||||
let d = <Option<u64>>::deserialize(d)?;
|
||||
Ok(d.map(|n| SystemTime::UNIX_EPOCH + Duration::from_secs(n)))
|
||||
<Option<u64>>::deserialize(d)?
|
||||
.map(|t| {
|
||||
SystemTime::UNIX_EPOCH
|
||||
.checked_add(Duration::from_secs(t))
|
||||
.ok_or_else(|| {
|
||||
serde::de::Error::custom(format_args!("timestamp out of bounds: {t}"))
|
||||
})
|
||||
})
|
||||
.transpose()
|
||||
}
|
||||
|
||||
struct JwkRenewalPermit<'a> {
|
||||
@@ -746,11 +761,11 @@ pub enum JwtClaimsError {
|
||||
#[error("invalid JWT token audience")]
|
||||
InvalidJwtTokenAudience,
|
||||
|
||||
#[error("JWT token has expired")]
|
||||
JwtTokenHasExpired,
|
||||
#[error("JWT token has expired (exp={0})")]
|
||||
JwtTokenHasExpired(u64),
|
||||
|
||||
#[error("JWT token is not yet ready to use")]
|
||||
JwtTokenNotYetReadyToUse,
|
||||
#[error("JWT token is not yet ready to use (nbf={0})")]
|
||||
JwtTokenNotYetReadyToUse(u64),
|
||||
}
|
||||
|
||||
#[allow(dead_code, reason = "Debug use only")]
|
||||
@@ -1233,14 +1248,14 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
|
||||
"nbf": now + 60,
|
||||
"aud": "neon",
|
||||
}},
|
||||
error: JwtClaimsError::JwtTokenNotYetReadyToUse,
|
||||
error: JwtClaimsError::JwtTokenNotYetReadyToUse(now + 60),
|
||||
},
|
||||
Test {
|
||||
body: json! {{
|
||||
"exp": now - 60,
|
||||
"aud": ["neon"],
|
||||
}},
|
||||
error: JwtClaimsError::JwtTokenHasExpired,
|
||||
error: JwtClaimsError::JwtTokenHasExpired(now - 60),
|
||||
},
|
||||
Test {
|
||||
body: json! {{
|
||||
|
||||
@@ -32,12 +32,6 @@ pub(crate) enum ComputeUserInfoParseError {
|
||||
option: EndpointId,
|
||||
},
|
||||
|
||||
#[error(
|
||||
"Common name inferred from SNI ('{}') is not known",
|
||||
.cn,
|
||||
)]
|
||||
UnknownCommonName { cn: String },
|
||||
|
||||
#[error("Project name ('{0}') must contain only alphanumeric characters and hyphen.")]
|
||||
MalformedProjectName(EndpointId),
|
||||
}
|
||||
@@ -66,22 +60,15 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn endpoint_sni(
|
||||
sni: &str,
|
||||
common_names: &HashSet<String>,
|
||||
) -> Result<Option<EndpointId>, ComputeUserInfoParseError> {
|
||||
let Some((subdomain, common_name)) = sni.split_once('.') else {
|
||||
return Err(ComputeUserInfoParseError::UnknownCommonName { cn: sni.into() });
|
||||
};
|
||||
pub(crate) fn endpoint_sni(sni: &str, common_names: &HashSet<String>) -> Option<EndpointId> {
|
||||
let (subdomain, common_name) = sni.split_once('.')?;
|
||||
if !common_names.contains(common_name) {
|
||||
return Err(ComputeUserInfoParseError::UnknownCommonName {
|
||||
cn: common_name.into(),
|
||||
});
|
||||
return None;
|
||||
}
|
||||
if subdomain == SERVERLESS_DRIVER_SNI {
|
||||
return Ok(None);
|
||||
return None;
|
||||
}
|
||||
Ok(Some(EndpointId::from(subdomain)))
|
||||
Some(EndpointId::from(subdomain))
|
||||
}
|
||||
|
||||
impl ComputeUserInfoMaybeEndpoint {
|
||||
@@ -113,15 +100,8 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
})
|
||||
.map(|name| name.into());
|
||||
|
||||
let endpoint_from_domain = if let Some(sni_str) = sni {
|
||||
if let Some(cn) = common_names {
|
||||
endpoint_sni(sni_str, cn)?
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let endpoint_from_domain =
|
||||
sni.and_then(|sni_str| common_names.and_then(|cn| endpoint_sni(sni_str, cn)));
|
||||
|
||||
let endpoint = match (endpoint_option, endpoint_from_domain) {
|
||||
// Invariant: if we have both project name variants, they should match.
|
||||
@@ -424,21 +404,34 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_inconsistent_sni() {
|
||||
fn parse_unknown_sni() {
|
||||
let options = StartupMessageParams::new([("user", "john_doe")]);
|
||||
|
||||
let sni = Some("project.localhost");
|
||||
let common_names = Some(["example.com".into()].into());
|
||||
|
||||
let ctx = RequestContext::test();
|
||||
let err = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
|
||||
.expect_err("should fail");
|
||||
match err {
|
||||
UnknownCommonName { cn } => {
|
||||
assert_eq!(cn, "localhost");
|
||||
}
|
||||
_ => panic!("bad error: {err:?}"),
|
||||
}
|
||||
let info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
|
||||
.unwrap();
|
||||
|
||||
assert!(info.endpoint_id.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_unknown_sni_with_options() {
|
||||
let options = StartupMessageParams::new([
|
||||
("user", "john_doe"),
|
||||
("options", "endpoint=foo-bar-baz-1234"),
|
||||
]);
|
||||
|
||||
let sni = Some("project.localhost");
|
||||
let common_names = Some(["example.com".into()].into());
|
||||
|
||||
let ctx = RequestContext::test();
|
||||
let info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(info.endpoint_id.as_deref(), Some("foo-bar-baz-1234"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -24,9 +24,6 @@ pub(crate) enum HandshakeError {
|
||||
#[error("protocol violation")]
|
||||
ProtocolViolation,
|
||||
|
||||
#[error("missing certificate")]
|
||||
MissingCertificate,
|
||||
|
||||
#[error("{0}")]
|
||||
StreamUpgradeError(#[from] StreamUpgradeError),
|
||||
|
||||
@@ -42,10 +39,6 @@ impl ReportableError for HandshakeError {
|
||||
match self {
|
||||
HandshakeError::EarlyData => crate::error::ErrorKind::User,
|
||||
HandshakeError::ProtocolViolation => crate::error::ErrorKind::User,
|
||||
// This error should not happen, but will if we have no default certificate and
|
||||
// the client sends no SNI extension.
|
||||
// If they provide SNI then we can be sure there is a certificate that matches.
|
||||
HandshakeError::MissingCertificate => crate::error::ErrorKind::Service,
|
||||
HandshakeError::StreamUpgradeError(upgrade) => match upgrade {
|
||||
StreamUpgradeError::AlreadyTls => crate::error::ErrorKind::Service,
|
||||
StreamUpgradeError::Io(_) => crate::error::ErrorKind::ClientDisconnect,
|
||||
@@ -146,7 +139,7 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
// try parse endpoint
|
||||
let ep = conn_info
|
||||
.server_name()
|
||||
.and_then(|sni| endpoint_sni(sni, &tls.common_names).ok().flatten());
|
||||
.and_then(|sni| endpoint_sni(sni, &tls.common_names));
|
||||
if let Some(ep) = ep {
|
||||
ctx.set_endpoint_id(ep);
|
||||
}
|
||||
@@ -161,10 +154,8 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
}
|
||||
}
|
||||
|
||||
let (_, tls_server_end_point) = tls
|
||||
.cert_resolver
|
||||
.resolve(conn_info.server_name())
|
||||
.ok_or(HandshakeError::MissingCertificate)?;
|
||||
let (_, tls_server_end_point) =
|
||||
tls.cert_resolver.resolve(conn_info.server_name());
|
||||
|
||||
stream = PqStream {
|
||||
framed: Framed {
|
||||
|
||||
@@ -98,8 +98,7 @@ fn generate_tls_config<'a>(
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(vec![cert.clone()], key.clone_key())?;
|
||||
|
||||
let mut cert_resolver = CertResolver::new();
|
||||
cert_resolver.add_cert(key, vec![cert], true)?;
|
||||
let cert_resolver = CertResolver::new(key, vec![cert])?;
|
||||
|
||||
let common_names = cert_resolver.get_common_names();
|
||||
|
||||
|
||||
@@ -199,8 +199,7 @@ fn get_conn_info(
|
||||
let endpoint = match connection_url.host() {
|
||||
Some(url::Host::Domain(hostname)) => {
|
||||
if let Some(tls) = tls {
|
||||
endpoint_sni(hostname, &tls.common_names)?
|
||||
.ok_or(ConnInfoError::MalformedEndpoint)?
|
||||
endpoint_sni(hostname, &tls.common_names).ok_or(ConnInfoError::MalformedEndpoint)?
|
||||
} else {
|
||||
hostname
|
||||
.split_once('.')
|
||||
|
||||
@@ -5,6 +5,7 @@ use anyhow::{Context, bail};
|
||||
use itertools::Itertools;
|
||||
use rustls::crypto::ring::{self, sign};
|
||||
use rustls::pki_types::{CertificateDer, PrivateKeyDer};
|
||||
use rustls::sign::CertifiedKey;
|
||||
use x509_cert::der::{Reader, SliceReader};
|
||||
|
||||
use super::{PG_ALPN_PROTOCOL, TlsServerEndPoint};
|
||||
@@ -25,10 +26,8 @@ pub fn configure_tls(
|
||||
certs_dir: Option<&String>,
|
||||
allow_tls_keylogfile: bool,
|
||||
) -> anyhow::Result<TlsConfig> {
|
||||
let mut cert_resolver = CertResolver::new();
|
||||
|
||||
// add default certificate
|
||||
cert_resolver.add_cert_path(key_path, cert_path, true)?;
|
||||
let mut cert_resolver = CertResolver::parse_new(key_path, cert_path)?;
|
||||
|
||||
// add extra certificates
|
||||
if let Some(certs_dir) = certs_dir {
|
||||
@@ -40,11 +39,8 @@ pub fn configure_tls(
|
||||
let key_path = path.join("tls.key");
|
||||
let cert_path = path.join("tls.crt");
|
||||
if key_path.exists() && cert_path.exists() {
|
||||
cert_resolver.add_cert_path(
|
||||
&key_path.to_string_lossy(),
|
||||
&cert_path.to_string_lossy(),
|
||||
false,
|
||||
)?;
|
||||
cert_resolver
|
||||
.add_cert_path(&key_path.to_string_lossy(), &cert_path.to_string_lossy())?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -83,92 +79,42 @@ pub fn configure_tls(
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
#[derive(Debug)]
|
||||
pub struct CertResolver {
|
||||
certs: HashMap<String, (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
default: Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
default: (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint),
|
||||
}
|
||||
|
||||
impl CertResolver {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
fn parse_new(key_path: &str, cert_path: &str) -> anyhow::Result<Self> {
|
||||
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
|
||||
Self::new(priv_key, cert_chain)
|
||||
}
|
||||
|
||||
fn add_cert_path(
|
||||
&mut self,
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
is_default: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let priv_key = {
|
||||
let key_bytes = std::fs::read(key_path)
|
||||
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
rustls_pemfile::private_key(&mut &key_bytes[..])
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
};
|
||||
pub fn new(
|
||||
priv_key: PrivateKeyDer<'static>,
|
||||
cert_chain: Vec<CertificateDer<'static>>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let (common_name, cert, tls_server_end_point) = process_key_cert(priv_key, cert_chain)?;
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
|
||||
|
||||
let cert_chain = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.try_collect()
|
||||
.with_context(|| {
|
||||
format!("Failed to read TLS certificate chain from bytes from file at '{cert_path}'.")
|
||||
})?
|
||||
};
|
||||
|
||||
self.add_cert(priv_key, cert_chain, is_default)
|
||||
let mut certs = HashMap::new();
|
||||
let default = (cert.clone(), tls_server_end_point);
|
||||
certs.insert(common_name, (cert, tls_server_end_point));
|
||||
Ok(Self { certs, default })
|
||||
}
|
||||
|
||||
pub fn add_cert(
|
||||
fn add_cert_path(&mut self, key_path: &str, cert_path: &str) -> anyhow::Result<()> {
|
||||
let (priv_key, cert_chain) = parse_key_cert(key_path, cert_path)?;
|
||||
self.add_cert(priv_key, cert_chain)
|
||||
}
|
||||
|
||||
fn add_cert(
|
||||
&mut self,
|
||||
priv_key: PrivateKeyDer<'static>,
|
||||
cert_chain: Vec<CertificateDer<'static>>,
|
||||
is_default: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
|
||||
|
||||
let first_cert = &cert_chain[0];
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
|
||||
let certificate = SliceReader::new(first_cert)
|
||||
.context("Failed to parse cerficiate")?
|
||||
.decode::<x509_cert::Certificate>()
|
||||
.context("Failed to parse cerficiate")?;
|
||||
|
||||
let common_name = certificate.tbs_certificate.subject.to_string();
|
||||
|
||||
// We need to get the canonical name for this certificate so we can match them against any domain names
|
||||
// seen within the proxy codebase.
|
||||
//
|
||||
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
|
||||
// We need to remove the wildcard prefix for the purposes of certificate selection.
|
||||
//
|
||||
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
|
||||
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
|
||||
//
|
||||
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
|
||||
// validation, so let's we can continue with any common-name
|
||||
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=") {
|
||||
s.to_string()
|
||||
} else {
|
||||
bail!("Failed to parse common name from certificate")
|
||||
};
|
||||
|
||||
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
|
||||
|
||||
if is_default {
|
||||
self.default = Some((cert.clone(), tls_server_end_point));
|
||||
}
|
||||
|
||||
let (common_name, cert, tls_server_end_point) = process_key_cert(priv_key, cert_chain)?;
|
||||
self.certs.insert(common_name, (cert, tls_server_end_point));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -177,12 +123,82 @@ impl CertResolver {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_key_cert(
|
||||
key_path: &str,
|
||||
cert_path: &str,
|
||||
) -> anyhow::Result<(PrivateKeyDer<'static>, Vec<CertificateDer<'static>>)> {
|
||||
let priv_key = {
|
||||
let key_bytes = std::fs::read(key_path)
|
||||
.with_context(|| format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
rustls_pemfile::private_key(&mut &key_bytes[..])
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
.with_context(|| format!("Failed to parse TLS keys at '{key_path}'"))?
|
||||
};
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
|
||||
|
||||
let cert_chain = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.try_collect()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
|
||||
)
|
||||
})?
|
||||
};
|
||||
|
||||
Ok((priv_key, cert_chain))
|
||||
}
|
||||
|
||||
fn process_key_cert(
|
||||
priv_key: PrivateKeyDer<'static>,
|
||||
cert_chain: Vec<CertificateDer<'static>>,
|
||||
) -> anyhow::Result<(String, Arc<CertifiedKey>, TlsServerEndPoint)> {
|
||||
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
|
||||
|
||||
let first_cert = &cert_chain[0];
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
|
||||
let certificate = SliceReader::new(first_cert)
|
||||
.context("Failed to parse cerficiate")?
|
||||
.decode::<x509_cert::Certificate>()
|
||||
.context("Failed to parse cerficiate")?;
|
||||
|
||||
let common_name = certificate.tbs_certificate.subject.to_string();
|
||||
|
||||
// We need to get the canonical name for this certificate so we can match them against any domain names
|
||||
// seen within the proxy codebase.
|
||||
//
|
||||
// In scram-proxy we use wildcard certificates only, with the database endpoint as the wildcard subdomain, taken from SNI.
|
||||
// We need to remove the wildcard prefix for the purposes of certificate selection.
|
||||
//
|
||||
// auth-broker does not use SNI and instead uses the Neon-Connection-String header.
|
||||
// Auth broker has the subdomain `apiauth` we need to remove for the purposes of validating the Neon-Connection-String.
|
||||
//
|
||||
// Console Redirect proxy does not use any wildcard domains and does not need any certificate selection or conn string
|
||||
// validation, so let's we can continue with any common-name
|
||||
let common_name = if let Some(s) = common_name.strip_prefix("CN=*.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=apiauth.") {
|
||||
s.to_string()
|
||||
} else if let Some(s) = common_name.strip_prefix("CN=") {
|
||||
s.to_string()
|
||||
} else {
|
||||
bail!("Failed to parse common name from certificate")
|
||||
};
|
||||
|
||||
let cert = Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key));
|
||||
|
||||
Ok((common_name, cert, tls_server_end_point))
|
||||
}
|
||||
|
||||
impl rustls::server::ResolvesServerCert for CertResolver {
|
||||
fn resolve(
|
||||
&self,
|
||||
client_hello: rustls::server::ClientHello<'_>,
|
||||
) -> Option<Arc<rustls::sign::CertifiedKey>> {
|
||||
self.resolve(client_hello.server_name()).map(|x| x.0)
|
||||
Some(self.resolve(client_hello.server_name()).0)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -190,7 +206,7 @@ impl CertResolver {
|
||||
pub fn resolve(
|
||||
&self,
|
||||
server_name: Option<&str>,
|
||||
) -> Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)> {
|
||||
) -> (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint) {
|
||||
// loop here and cut off more and more subdomains until we find
|
||||
// a match to get a proper wildcard support. OTOH, we now do not
|
||||
// use nested domains, so keep this simple for now.
|
||||
@@ -200,12 +216,17 @@ impl CertResolver {
|
||||
if let Some(mut sni_name) = server_name {
|
||||
loop {
|
||||
if let Some(cert) = self.certs.get(sni_name) {
|
||||
return Some(cert.clone());
|
||||
return cert.clone();
|
||||
}
|
||||
if let Some((_, rest)) = sni_name.split_once('.') {
|
||||
sni_name = rest;
|
||||
} else {
|
||||
return None;
|
||||
// The customer has some custom DNS mapping - just return
|
||||
// a default certificate.
|
||||
//
|
||||
// This will error if the customer uses anything stronger
|
||||
// than sslmode=require. That's a choice they can make.
|
||||
return self.default.clone();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use futures_util::StreamExt;
|
||||
@@ -56,7 +55,7 @@ impl TimelineAnalysis {
|
||||
pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
remote_client: &GenericRemoteStorage,
|
||||
id: &TenantShardTimelineId,
|
||||
tenant_objects: Arc<tokio::sync::Mutex<TenantObjectListing>>,
|
||||
tenant_objects: &mut TenantObjectListing,
|
||||
s3_active_branch: Option<&BranchData>,
|
||||
console_branch: Option<BranchData>,
|
||||
s3_data: Option<RemoteTimelineBlobData>,
|
||||
@@ -151,11 +150,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
))
|
||||
}
|
||||
|
||||
if !tenant_objects
|
||||
.lock()
|
||||
.await
|
||||
.check_ref(id.timeline_id, &layer, &metadata)
|
||||
{
|
||||
if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
|
||||
let path = remote_layer_path(
|
||||
&id.tenant_shard_id.tenant_id,
|
||||
&id.timeline_id,
|
||||
|
||||
@@ -73,12 +73,8 @@ enum Command {
|
||||
node_kind: NodeKind,
|
||||
#[arg(short, long, default_value_t = false)]
|
||||
json: bool,
|
||||
/// If provided, only these tenants will be listed from the remote storage.
|
||||
#[arg(long = "tenant-id", num_args = 0..)]
|
||||
tenant_ids: Vec<TenantShardId>,
|
||||
/// If provided, we will list all tenants, but then filter with the prefix.
|
||||
#[arg(long = "tenant-id-prefix")]
|
||||
tenant_id_prefix: Option<TenantId>,
|
||||
#[arg(long = "post", default_value_t = false)]
|
||||
post_to_storcon: bool,
|
||||
#[arg(long, default_value = None)]
|
||||
@@ -182,7 +178,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
Command::ScanMetadata {
|
||||
json,
|
||||
tenant_ids,
|
||||
tenant_id_prefix,
|
||||
node_kind,
|
||||
post_to_storcon,
|
||||
dump_db_connstr,
|
||||
@@ -191,9 +186,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
verbose,
|
||||
} => {
|
||||
if let NodeKind::Safekeeper = node_kind {
|
||||
if tenant_id_prefix.is_some() {
|
||||
bail!("`tenant_id_prefix` is not supported for safekeeper node_kind");
|
||||
}
|
||||
let db_or_list = match (timeline_lsns, dump_db_connstr) {
|
||||
(Some(timeline_lsns), _) => {
|
||||
let timeline_lsns = serde_json::from_str(&timeline_lsns)
|
||||
@@ -235,7 +227,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
bucket_config,
|
||||
controller_client.as_ref(),
|
||||
tenant_ids,
|
||||
tenant_id_prefix,
|
||||
json,
|
||||
post_to_storcon,
|
||||
verbose,
|
||||
@@ -347,7 +338,6 @@ pub async fn run_cron_job(
|
||||
bucket_config,
|
||||
controller_client,
|
||||
Vec::new(),
|
||||
None,
|
||||
true,
|
||||
post_to_storcon,
|
||||
false, // default to non-verbose mode
|
||||
@@ -394,12 +384,10 @@ pub async fn pageserver_physical_gc_cmd(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn scan_pageserver_metadata_cmd(
|
||||
bucket_config: BucketConfig,
|
||||
controller_client: Option<&control_api::Client>,
|
||||
tenant_shard_ids: Vec<TenantShardId>,
|
||||
tenant_id_prefix: Option<TenantId>,
|
||||
json: bool,
|
||||
post_to_storcon: bool,
|
||||
verbose: bool,
|
||||
@@ -410,14 +398,7 @@ pub async fn scan_pageserver_metadata_cmd(
|
||||
"Posting pageserver scan health status to storage controller requires `--controller-api` and `--controller-jwt` to run"
|
||||
));
|
||||
}
|
||||
match scan_pageserver_metadata(
|
||||
bucket_config.clone(),
|
||||
tenant_shard_ids,
|
||||
tenant_id_prefix,
|
||||
verbose,
|
||||
)
|
||||
.await
|
||||
{
|
||||
match scan_pageserver_metadata(bucket_config.clone(), tenant_shard_ids, verbose).await {
|
||||
Err(e) => {
|
||||
tracing::error!("Failed: {e}");
|
||||
Err(e)
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::SinkExt;
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use pageserver::tenant::remote_timeline_client::remote_layer_path;
|
||||
use pageserver_api::controller_api::MetadataHealthUpdateRequest;
|
||||
@@ -9,7 +7,6 @@ use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use serde::Serialize;
|
||||
use tracing::{Instrument, info_span};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TenantId;
|
||||
use utils::shard::ShardCount;
|
||||
|
||||
@@ -17,12 +14,10 @@ use crate::checks::{
|
||||
BlobDataParseResult, RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis,
|
||||
branch_cleanup_and_check_errors, list_timeline_blobs,
|
||||
};
|
||||
use crate::metadata_stream::{
|
||||
stream_tenant_timelines, stream_tenants, stream_tenants_maybe_prefix,
|
||||
};
|
||||
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
|
||||
use crate::{BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, init_remote};
|
||||
|
||||
#[derive(Serialize, Default, Clone)]
|
||||
#[derive(Serialize, Default)]
|
||||
pub struct MetadataSummary {
|
||||
tenant_count: usize,
|
||||
timeline_count: usize,
|
||||
@@ -107,13 +102,13 @@ impl MetadataSummary {
|
||||
|
||||
format!(
|
||||
"Tenants: {}
|
||||
Timelines: {}
|
||||
Timeline-shards: {}
|
||||
With errors: {}
|
||||
With warnings: {}
|
||||
With orphan layers: {}
|
||||
Index versions: {version_summary}
|
||||
",
|
||||
Timelines: {}
|
||||
Timeline-shards: {}
|
||||
With errors: {}
|
||||
With warnings: {}
|
||||
With orphan layers: {}
|
||||
Index versions: {version_summary}
|
||||
",
|
||||
self.tenant_count,
|
||||
self.timeline_count,
|
||||
self.timeline_shard_count,
|
||||
@@ -143,243 +138,24 @@ impl MetadataSummary {
|
||||
pub async fn scan_pageserver_metadata(
|
||||
bucket_config: BucketConfig,
|
||||
tenant_ids: Vec<TenantShardId>,
|
||||
tenant_id_prefix: Option<TenantId>,
|
||||
verbose: bool,
|
||||
) -> anyhow::Result<MetadataSummary> {
|
||||
let (remote_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
|
||||
|
||||
if !tenant_ids.is_empty() && tenant_id_prefix.is_some() {
|
||||
anyhow::bail!("`tenant_id_prefix` is not supported when `tenant_ids` is provided");
|
||||
}
|
||||
let tenants = if tenant_ids.is_empty() {
|
||||
futures::future::Either::Left(stream_tenants(&remote_client, &target))
|
||||
} else {
|
||||
futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
|
||||
};
|
||||
|
||||
let (mut list_tenants_tx, list_tenants_rx) = futures::channel::mpsc::channel(1);
|
||||
let remote_client_inner = remote_client.clone();
|
||||
let target_inner = target.clone();
|
||||
let list_tenants = tokio::spawn(async move {
|
||||
let mut cnt = 0;
|
||||
if tenant_ids.is_empty() {
|
||||
if let Some(tenant_id_prefix) = tenant_id_prefix {
|
||||
let stream = stream_tenants_maybe_prefix(
|
||||
&remote_client_inner,
|
||||
&target_inner,
|
||||
Some(tenant_id_prefix.to_string()),
|
||||
);
|
||||
let mut stream = Box::pin(stream);
|
||||
while let Some(tenant) = stream.next().await {
|
||||
let tenant = tenant?;
|
||||
list_tenants_tx.send(tenant).await?;
|
||||
cnt += 1;
|
||||
}
|
||||
} else {
|
||||
let stream = stream_tenants(&remote_client_inner, &target_inner);
|
||||
let mut stream = Box::pin(stream);
|
||||
while let Some(tenant) = stream.next().await {
|
||||
let tenant = tenant?;
|
||||
list_tenants_tx.send(tenant).await?;
|
||||
cnt += 1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for tenant_id in tenant_ids {
|
||||
list_tenants_tx.send(tenant_id).await?;
|
||||
cnt += 1;
|
||||
}
|
||||
}
|
||||
tracing::info!("list_tenants: collected {} tenants", cnt);
|
||||
Ok::<_, anyhow::Error>(())
|
||||
});
|
||||
// How many tenants to process in parallel. We need to be mindful of pageservers
|
||||
// accessing the same per tenant prefixes, so use a lower setting than pageservers.
|
||||
const CONCURRENCY: usize = 32;
|
||||
|
||||
let (mut list_timelines_tx, list_timelines_rx) = futures::channel::mpsc::channel(1);
|
||||
let remote_client_inner = remote_client.clone();
|
||||
let target_inner = target.clone();
|
||||
let list_timelines = tokio::spawn(async move {
|
||||
let stream = list_tenants_rx
|
||||
.map(|tenant_id| {
|
||||
stream_tenant_timelines(&remote_client_inner, &target_inner, tenant_id)
|
||||
})
|
||||
.buffered(8)
|
||||
.try_flatten();
|
||||
let mut stream = Box::pin(stream);
|
||||
while let Some(item) = stream.next().await {
|
||||
let item = item?;
|
||||
list_timelines_tx.send(item).await?;
|
||||
}
|
||||
Ok::<_, anyhow::Error>(())
|
||||
});
|
||||
|
||||
let (mut read_timelines_tx, read_timelines_rx) = futures::channel::mpsc::channel(1);
|
||||
let remote_client_inner = remote_client.clone();
|
||||
let target_inner = target.clone();
|
||||
let read_timelines = tokio::spawn(async move {
|
||||
let stream = list_timelines_rx
|
||||
.map(|ttid| report_on_timeline(&remote_client_inner, &target_inner, ttid))
|
||||
.buffered(32);
|
||||
let mut stream = Box::pin(stream);
|
||||
while let Some(item) = stream.next().await {
|
||||
let item = item?;
|
||||
read_timelines_tx.send(item).await?;
|
||||
}
|
||||
Ok::<_, anyhow::Error>(())
|
||||
});
|
||||
|
||||
let summary = Arc::new(tokio::sync::Mutex::new(MetadataSummary::new()));
|
||||
let summary_inner = summary.clone();
|
||||
|
||||
let (mut consolidate_tenants_tx, consolidate_tenants_rx) = futures::channel::mpsc::channel(32);
|
||||
let consolidate_tenants = tokio::spawn(async move {
|
||||
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
|
||||
// shards in the same tenant might refer to one anothers' keys if a shard split has happened.
|
||||
|
||||
let mut tenant_id = None;
|
||||
let mut tenant_objects = TenantObjectListing::default();
|
||||
let mut tenant_timeline_results = Vec::new();
|
||||
|
||||
// Iterate through all the timeline results. These are in key-order, so
|
||||
// all results for the same tenant will be adjacent. We accumulate these,
|
||||
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
|
||||
let mut highest_shard_count = ShardCount::MIN;
|
||||
let mut read_timelines_rx = read_timelines_rx;
|
||||
while let Some(i) = read_timelines_rx.next().await {
|
||||
let (ttid, data) = i;
|
||||
{
|
||||
let mut guard = summary_inner.lock().await;
|
||||
guard.update_data(&data);
|
||||
}
|
||||
|
||||
match tenant_id {
|
||||
Some(prev_tenant_id) => {
|
||||
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
|
||||
// New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
|
||||
let tenant_objects = std::mem::take(&mut tenant_objects);
|
||||
let timelines = std::mem::take(&mut tenant_timeline_results);
|
||||
analyze_tenant(
|
||||
summary_inner.clone(),
|
||||
Arc::new(tokio::sync::Mutex::new(tenant_objects)),
|
||||
timelines,
|
||||
highest_shard_count,
|
||||
&mut consolidate_tenants_tx,
|
||||
)
|
||||
.await?;
|
||||
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
|
||||
highest_shard_count = ttid.tenant_shard_id.shard_count;
|
||||
} else {
|
||||
highest_shard_count =
|
||||
highest_shard_count.max(ttid.tenant_shard_id.shard_count);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
|
||||
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
|
||||
}
|
||||
}
|
||||
|
||||
match &data.blob_data {
|
||||
BlobDataParseResult::Parsed {
|
||||
index_part: _,
|
||||
index_part_generation: _index_part_generation,
|
||||
s3_layers,
|
||||
index_part_last_modified_time: _,
|
||||
index_part_snapshot_time: _,
|
||||
} => {
|
||||
tenant_objects.push(ttid, s3_layers.clone());
|
||||
}
|
||||
BlobDataParseResult::Relic => (),
|
||||
BlobDataParseResult::Incorrect {
|
||||
errors: _,
|
||||
s3_layers,
|
||||
} => {
|
||||
tenant_objects.push(ttid, s3_layers.clone());
|
||||
}
|
||||
}
|
||||
tenant_timeline_results.push((ttid, data));
|
||||
}
|
||||
|
||||
if !tenant_timeline_results.is_empty() {
|
||||
analyze_tenant(
|
||||
summary_inner.clone(),
|
||||
Arc::new(tokio::sync::Mutex::new(tenant_objects)),
|
||||
tenant_timeline_results,
|
||||
highest_shard_count,
|
||||
&mut consolidate_tenants_tx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Ok::<_, anyhow::Error>(())
|
||||
});
|
||||
|
||||
let remote_client_inner = remote_client.clone();
|
||||
let summary_inner = summary.clone();
|
||||
let analyze_tenants = tokio::spawn(async move {
|
||||
let stream = consolidate_tenants_rx
|
||||
.map(|(ttid, tenant_objects, data)| {
|
||||
let remote_client_inner = remote_client_inner.clone();
|
||||
async move {
|
||||
let generation = if let BlobDataParseResult::Parsed {
|
||||
index_part: _,
|
||||
index_part_generation,
|
||||
s3_layers: _,
|
||||
index_part_last_modified_time: _,
|
||||
index_part_snapshot_time: _,
|
||||
} = &data.blob_data
|
||||
{
|
||||
Some(*index_part_generation)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let res = branch_cleanup_and_check_errors(
|
||||
&remote_client_inner,
|
||||
&ttid,
|
||||
tenant_objects.clone(),
|
||||
None,
|
||||
None,
|
||||
Some(data),
|
||||
)
|
||||
.await;
|
||||
(ttid, tenant_objects.clone(), generation, res)
|
||||
}
|
||||
})
|
||||
.buffered(32);
|
||||
let mut last_tenant = None;
|
||||
let mut last_tenant_objects = None;
|
||||
let mut timeline_generations = HashMap::new();
|
||||
let mut stream = Box::pin(stream);
|
||||
while let Some((ttid, tenant_objects, generation, res)) = stream.next().await {
|
||||
if last_tenant != Some(ttid) {
|
||||
if let Some(tenant_id) = last_tenant {
|
||||
let timeline_generations = std::mem::take(&mut timeline_generations);
|
||||
identify_orphans(
|
||||
tenant_id.tenant_shard_id.tenant_id,
|
||||
last_tenant_objects.take().unwrap(),
|
||||
summary_inner.clone(),
|
||||
&timeline_generations,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
last_tenant = Some(ttid);
|
||||
last_tenant_objects = Some(tenant_objects);
|
||||
}
|
||||
if let Some(generation) = generation {
|
||||
timeline_generations.insert(ttid, generation);
|
||||
}
|
||||
{
|
||||
let mut guard = summary_inner.lock().await;
|
||||
guard.update_analysis(&ttid, &res, verbose);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(tenant_id) = last_tenant {
|
||||
identify_orphans(
|
||||
tenant_id.tenant_shard_id.tenant_id,
|
||||
last_tenant_objects.take().unwrap(),
|
||||
summary_inner.clone(),
|
||||
&timeline_generations,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok::<_, anyhow::Error>(())
|
||||
});
|
||||
// Generate a stream of TenantTimelineId
|
||||
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
|
||||
let timelines = timelines.try_buffered(CONCURRENCY);
|
||||
let timelines = timelines.try_flatten();
|
||||
|
||||
// Generate a stream of S3TimelineBlobData
|
||||
async fn report_on_timeline(
|
||||
@@ -387,94 +163,93 @@ pub async fn scan_pageserver_metadata(
|
||||
target: &RootTarget,
|
||||
ttid: TenantShardTimelineId,
|
||||
) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
|
||||
tracing::info!("listing blobs for timeline: {}", ttid);
|
||||
let data = list_timeline_blobs(remote_client, ttid, target).await?;
|
||||
Ok((ttid, data))
|
||||
}
|
||||
let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid));
|
||||
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
|
||||
|
||||
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
|
||||
// shards in the same tenant might refer to one anothers' keys if a shard split has happened.
|
||||
|
||||
let mut tenant_id = None;
|
||||
let mut tenant_objects = TenantObjectListing::default();
|
||||
let mut tenant_timeline_results = Vec::new();
|
||||
|
||||
// DO NOT call any long-running tasks in this function; always route them through the channel and let
|
||||
// other tokio tasks handle them.
|
||||
async fn analyze_tenant(
|
||||
summary: Arc<tokio::sync::Mutex<MetadataSummary>>,
|
||||
tenant_objects: Arc<tokio::sync::Mutex<TenantObjectListing>>,
|
||||
remote_client: &GenericRemoteStorage,
|
||||
tenant_id: TenantId,
|
||||
summary: &mut MetadataSummary,
|
||||
mut tenant_objects: TenantObjectListing,
|
||||
timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
|
||||
highest_shard_count: ShardCount,
|
||||
output_tx: &mut futures::channel::mpsc::Sender<(
|
||||
TenantShardTimelineId,
|
||||
Arc<tokio::sync::Mutex<TenantObjectListing>>,
|
||||
RemoteTimelineBlobData,
|
||||
)>,
|
||||
) -> anyhow::Result<()> {
|
||||
{
|
||||
let mut guard = summary.lock().await;
|
||||
guard.tenant_count += 1;
|
||||
}
|
||||
verbose: bool,
|
||||
) {
|
||||
summary.tenant_count += 1;
|
||||
|
||||
let mut timeline_ids = HashSet::new();
|
||||
let mut timeline_generations = HashMap::new();
|
||||
for (ttid, data) in timelines {
|
||||
async {
|
||||
if ttid.tenant_shard_id.shard_count == highest_shard_count {
|
||||
// Only analyze `TenantShardId`s with highest shard count.
|
||||
if ttid.tenant_shard_id.shard_count == highest_shard_count {
|
||||
// Only analyze `TenantShardId`s with highest shard count.
|
||||
|
||||
// Stash the generation of each timeline, for later use identifying orphan layers
|
||||
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
index_part_generation: _,
|
||||
s3_layers: _,
|
||||
index_part_last_modified_time: _,
|
||||
index_part_snapshot_time: _,
|
||||
} = &data.blob_data
|
||||
{
|
||||
if index_part.deleted_at.is_some() {
|
||||
// skip deleted timeline.
|
||||
tracing::info!("Skip analysis of {} b/c timeline is already deleted", ttid);
|
||||
return Ok(());
|
||||
// Stash the generation of each timeline, for later use identifying orphan layers
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
index_part_generation,
|
||||
s3_layers: _,
|
||||
index_part_last_modified_time: _,
|
||||
index_part_snapshot_time: _,
|
||||
} = &data.blob_data
|
||||
{
|
||||
if index_part.deleted_at.is_some() {
|
||||
// skip deleted timeline.
|
||||
tracing::info!(
|
||||
"Skip analysis of {} b/c timeline is already deleted",
|
||||
ttid
|
||||
);
|
||||
return;
|
||||
}
|
||||
timeline_generations.insert(ttid, *index_part_generation);
|
||||
}
|
||||
|
||||
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
|
||||
// reference counts for layers across the tenant.
|
||||
let analysis = branch_cleanup_and_check_errors(
|
||||
remote_client,
|
||||
&ttid,
|
||||
&mut tenant_objects,
|
||||
None,
|
||||
None,
|
||||
Some(data),
|
||||
)
|
||||
.await;
|
||||
summary.update_analysis(&ttid, &analysis, verbose);
|
||||
|
||||
timeline_ids.insert(ttid.timeline_id);
|
||||
} else {
|
||||
tracing::info!(
|
||||
"Skip analysis of {} b/c a lower shard count than {}",
|
||||
ttid,
|
||||
highest_shard_count.0,
|
||||
);
|
||||
}
|
||||
|
||||
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
|
||||
// reference counts for layers across the tenant.
|
||||
|
||||
output_tx.send((ttid, tenant_objects.clone(), data)).await?;
|
||||
|
||||
timeline_ids.insert(ttid.timeline_id);
|
||||
} else {
|
||||
tracing::info!(
|
||||
"Skip analysis of {} b/c a lower shard count than {}",
|
||||
ttid,
|
||||
highest_shard_count.0,
|
||||
);
|
||||
}
|
||||
Ok::<_, anyhow::Error>(())
|
||||
}.instrument(
|
||||
info_span!("analyze-timeline", shard = %ttid.tenant_shard_id.shard_slug(), timeline = %ttid.timeline_id),
|
||||
)
|
||||
.await?;
|
||||
.instrument(
|
||||
info_span!("analyze-timeline", shard = %ttid.tenant_shard_id.shard_slug(), timeline = %ttid.timeline_id),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
{
|
||||
let mut guard = summary.lock().await;
|
||||
guard.timeline_count += timeline_ids.len();
|
||||
}
|
||||
summary.timeline_count += timeline_ids.len();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn identify_orphans(
|
||||
tenant_id: TenantId,
|
||||
tenant_objects: Arc<tokio::sync::Mutex<TenantObjectListing>>,
|
||||
summary: Arc<tokio::sync::Mutex<MetadataSummary>>,
|
||||
timeline_generations: &HashMap<TenantShardTimelineId, Generation>,
|
||||
) {
|
||||
// Identifying orphan layers must be done on a tenant-wide basis, because individual
|
||||
// shards' layers may be referenced by other shards.
|
||||
//
|
||||
// Orphan layers are not a corruption, and not an indication of a problem. They are just
|
||||
// consuming some space in remote storage, and may be cleaned up at leisure.
|
||||
|
||||
let orphans = { tenant_objects.lock().await.get_orphans() };
|
||||
for (shard_index, timeline_id, layer_file, generation) in orphans {
|
||||
for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
|
||||
let ttid = TenantShardTimelineId {
|
||||
tenant_shard_id: TenantShardId {
|
||||
tenant_id,
|
||||
@@ -504,20 +279,83 @@ pub async fn scan_pageserver_metadata(
|
||||
|
||||
tracing::info!("Orphan layer detected: {orphan_path}");
|
||||
|
||||
{
|
||||
let mut guard = summary.lock().await;
|
||||
guard.notify_timeline_orphan(&ttid);
|
||||
}
|
||||
summary.notify_timeline_orphan(&ttid);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: bail out early if any of the tasks fail
|
||||
list_tenants.await??;
|
||||
list_timelines.await??;
|
||||
read_timelines.await??;
|
||||
consolidate_tenants.await??;
|
||||
analyze_tenants.await??;
|
||||
// Iterate through all the timeline results. These are in key-order, so
|
||||
// all results for the same tenant will be adjacent. We accumulate these,
|
||||
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
|
||||
let mut summary = MetadataSummary::new();
|
||||
let mut highest_shard_count = ShardCount::MIN;
|
||||
while let Some(i) = timelines.next().await {
|
||||
let (ttid, data) = i?;
|
||||
summary.update_data(&data);
|
||||
|
||||
let summary = summary.lock().await;
|
||||
Ok(summary.clone())
|
||||
match tenant_id {
|
||||
Some(prev_tenant_id) => {
|
||||
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
|
||||
// New tenant: analyze this tenant's timelines, clear accumulated tenant_timeline_results
|
||||
let tenant_objects = std::mem::take(&mut tenant_objects);
|
||||
let timelines = std::mem::take(&mut tenant_timeline_results);
|
||||
analyze_tenant(
|
||||
&remote_client,
|
||||
prev_tenant_id,
|
||||
&mut summary,
|
||||
tenant_objects,
|
||||
timelines,
|
||||
highest_shard_count,
|
||||
verbose,
|
||||
)
|
||||
.instrument(info_span!("analyze-tenant", tenant = %prev_tenant_id))
|
||||
.await;
|
||||
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
|
||||
highest_shard_count = ttid.tenant_shard_id.shard_count;
|
||||
} else {
|
||||
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
|
||||
highest_shard_count = highest_shard_count.max(ttid.tenant_shard_id.shard_count);
|
||||
}
|
||||
}
|
||||
|
||||
match &data.blob_data {
|
||||
BlobDataParseResult::Parsed {
|
||||
index_part: _,
|
||||
index_part_generation: _index_part_generation,
|
||||
s3_layers,
|
||||
index_part_last_modified_time: _,
|
||||
index_part_snapshot_time: _,
|
||||
} => {
|
||||
tenant_objects.push(ttid, s3_layers.clone());
|
||||
}
|
||||
BlobDataParseResult::Relic => (),
|
||||
BlobDataParseResult::Incorrect {
|
||||
errors: _,
|
||||
s3_layers,
|
||||
} => {
|
||||
tenant_objects.push(ttid, s3_layers.clone());
|
||||
}
|
||||
}
|
||||
tenant_timeline_results.push((ttid, data));
|
||||
}
|
||||
|
||||
if !tenant_timeline_results.is_empty() {
|
||||
let tenant_id = tenant_id.expect("Must be set if results are present");
|
||||
analyze_tenant(
|
||||
&remote_client,
|
||||
tenant_id,
|
||||
&mut summary,
|
||||
tenant_objects,
|
||||
tenant_timeline_results,
|
||||
highest_shard_count,
|
||||
verbose,
|
||||
)
|
||||
.instrument(info_span!("analyze-tenant", tenant = %tenant_id))
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(summary)
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import math # Add this import
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
@@ -87,7 +88,10 @@ def test_cumulative_statistics_persistence(
|
||||
- insert additional tuples that by itself are not enough to trigger auto-vacuum but in combination with the previous tuples are
|
||||
- verify that autovacuum is triggered by the combination of tuples inserted before and after endpoint suspension
|
||||
"""
|
||||
project = neon_api.create_project(pg_version)
|
||||
project = neon_api.create_project(
|
||||
pg_version,
|
||||
f"Test cumulative statistics persistence, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}",
|
||||
)
|
||||
project_id = project["project"]["id"]
|
||||
neon_api.wait_for_operation_to_finish(project_id)
|
||||
endpoint_id = project["endpoints"][0]["id"]
|
||||
|
||||
@@ -62,7 +62,9 @@ def test_ro_replica_lag(
|
||||
|
||||
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
|
||||
|
||||
project = neon_api.create_project(pg_version)
|
||||
project = neon_api.create_project(
|
||||
pg_version, f"Test readonly replica lag, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
|
||||
)
|
||||
project_id = project["project"]["id"]
|
||||
log.info("Project ID: %s", project_id)
|
||||
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
|
||||
@@ -195,7 +197,9 @@ def test_replication_start_stop(
|
||||
pgbench_duration = f"-T{2**num_replicas * configuration_test_time_sec}"
|
||||
error_occurred = False
|
||||
|
||||
project = neon_api.create_project(pg_version)
|
||||
project = neon_api.create_project(
|
||||
pg_version, f"Test replication start stop, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
|
||||
)
|
||||
project_id = project["project"]["id"]
|
||||
log.info("Project ID: %s", project_id)
|
||||
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
|
||||
|
||||
@@ -202,6 +202,8 @@ def test_pageserver_gc_compaction_preempt(
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=conf)
|
||||
|
||||
env.pageserver.allowed_errors.append(".*The timeline or pageserver is shutting down.*")
|
||||
env.pageserver.allowed_errors.append(".*flush task cancelled.*")
|
||||
env.pageserver.allowed_errors.append(".*failed to pipe.*")
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
28
test_runner/regress/test_gist.py
Normal file
28
test_runner/regress/test_gist.py
Normal file
@@ -0,0 +1,28 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
#
|
||||
# Test unlogged build for GIST index
|
||||
#
|
||||
def test_gist(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
con = endpoint.connect()
|
||||
cur = con.cursor()
|
||||
iterations = 100
|
||||
|
||||
for _ in range(iterations):
|
||||
cur.execute(
|
||||
"CREATE TABLE pvactst (i INT, a INT[], p POINT) with (autovacuum_enabled = off)"
|
||||
)
|
||||
cur.execute(
|
||||
"INSERT INTO pvactst SELECT i, array[1,2,3], point(i, i+1) FROM generate_series(1,1000) i"
|
||||
)
|
||||
cur.execute("CREATE INDEX gist_pvactst ON pvactst USING gist (p)")
|
||||
cur.execute("VACUUM pvactst")
|
||||
cur.execute("DROP TABLE pvactst")
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: d3c9d61fb7...c8dab02bfc
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 8ecb12f21d...b838c8969b
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 37496f87b5...05ddf212e2
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -5,14 +5,14 @@
|
||||
],
|
||||
"v16": [
|
||||
"16.8",
|
||||
"37496f87b5324af53c56127e278ee5b1e8435253"
|
||||
"05ddf212e2e07b788b5c8b88bdcf98630941f6ae"
|
||||
],
|
||||
"v15": [
|
||||
"15.12",
|
||||
"8ecb12f21d862dfa39f7204b8f5e1c00a2a225b3"
|
||||
"b838c8969b7c63f3e637a769656f5f36793b797c"
|
||||
],
|
||||
"v14": [
|
||||
"14.17",
|
||||
"d3c9d61fb7a362a165dac7060819dd9d6ad68c28"
|
||||
"c8dab02bfc003ae7bd59096919042d7840f3c194"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user