Compare commits

..

12 Commits

Author SHA1 Message Date
Arpad Müller
eb0b80e3ea Increase partial backup timeout to 3 hours 2024-05-13 16:57:31 +02:00
Joonas Koivunen
4d8a10af1c fix: do not create metrics contention from background task permit (#7730)
The background task loop permit metrics do two of `with_label_values`
very often. Change the codepath to cache the counters on first access
into a `Lazy` with `enum_map::EnumMap`. The expectation is that this
should not fix for metric collection failures under load, but it doesn't
hurt.

Cc: #7161
2024-05-13 17:49:50 +03:00
Alexander Bayandin
55ba885f6b CI(report-benchmarks-failures): report benchmarks failures to slack (#7678)
## Problem

`benchmarks` job that we run on the main doesn't block anything, so it's
easy to miss its failure.

Ref https://github.com/neondatabase/cloud/issues/13087

## Summary of changes
- Add `report-benchmarks-failures` job that report failures of
`benchmarks` job to a Slack channel
2024-05-13 14:16:03 +01:00
Christian Schwarz
6ff74295b5 chore(pageserver): plumb through RequestContext to VirtualFile open methods (#7725)
This PR introduces no functional changes.

The `open()` path will be done separately.

refs https://github.com/neondatabase/neon/issues/6107
refs https://github.com/neondatabase/neon/issues/7386

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2024-05-13 14:52:06 +02:00
Vlad Lazar
bbe730d7ca Revert protocol version upgrade (#7727)
## Problem

"John pointed out that the switch to protocol version 2 made
test_gc_aggressive test flaky:
https://github.com/neondatabase/neon/issues/7692.
I tracked it down, and that is indeed an issue. Conditions for hitting
the issue:
The problem occurs in the primary
GC horizon is set to a very low value, e.g. 0.
If the primary is actively writing WAL, and GC runs in the pageserver at
the same time that the primary sends a GetPage request, it's possible
that the GC advances the GC horizon past the GetPage request's LSN. I'm
working on a fix here: https://github.com/neondatabase/neon/pull/7708."
- Heikki

## Summary of changes
Use protocol version 1 as default.
2024-05-13 13:41:14 +01:00
Jure Bajic
5a0da93c53 Fix test_lock_time_tracing flakiness (#7712)
## Problem

Closes
[test_lock_time_tracing](https://github.com/neondatabase/neon/issues/7691)

## Summary of changes

Taking a look at the execution of the same test in logs, it can be
concluded that the time we are holding the lock is sometimes not
enough(must be above 30s) to cause the second log to be shown by the
thread that is creating a timeline.

In the [successful
execution](https://neon-github-public-dev.s3.amazonaws.com/reports/pr-7663/9021247520/index.html#testresult/a21bce8c702b37f0)
it can be seen that the log `Operation TimelineCreate on key
5e088fc2dd14945020d0fa6d9efd1e36 has waited 30.000887709s for shared
lock` was on the edge of being logged, if it was below 30s it would not
be shown.

```
2024-05-09T18:02:32.552093Z  WARN request{method=PUT path=/control/v1/tenant/5e088fc2dd14945020d0fa6d9efd1e36/policy request_id=af7e4a04-d181-4acb-952f-9597c8eba5a8}: Lock on UpdatePolicy was held for 31.001892592s
2024-05-09T18:02:32.552109Z  INFO request{method=PUT path=/control/v1/tenant/5e088fc2dd14945020d0fa6d9efd1e36/policy request_id=af7e4a04-d181-4acb-952f-9597c8eba5a8}: Request handled, status: 200 OK
2024-05-09T18:02:32.552271Z  WARN request{method=POST path=/v1/tenant/5e088fc2dd14945020d0fa6d9efd1e36/timeline request_id=d3af756e-dbb3-476b-89bd-3594f19bbb67}: Operation TimelineCreate on key 5e088fc2dd14945020d0fa6d9efd1e36 has waited 30.000887709s for shared lock
```

In the [failed
execution](https://neon-github-public-dev.s3.amazonaws.com/reports/pr-7663/9022743601/index.html#/testresult/deb90136aeae4fce):
```
2024-05-09T20:14:33.526311Z  INFO request{method=POST path=/v1/tenant/68194ffadb61ca11adcbb11cbeb4ec6e/timeline request_id=1daa8c31-522d-4805-9114-68cdcffb9823}: Creating timeline 68194ffadb61ca11adcbb11cbeb4ec6e/f72185990ed13f0b0533383f81d877af
2024-05-09T20:14:36.441165Z  INFO Heartbeat round complete for 1 nodes, 0 offline
2024-05-09T20:14:41.441657Z  INFO Heartbeat round complete for 1 nodes, 0 offline
2024-05-09T20:14:41.535227Z  INFO request{method=POST path=/upcall/v1/validate request_id=94a7be88-474e-4163-92f8-57b401473add}: Handling request
2024-05-09T20:14:41.535269Z  INFO request{method=POST path=/upcall/v1/validate request_id=94a7be88-474e-4163-92f8-57b401473add}: handle_validate: 68194ffadb61ca11adcbb11cbeb4ec6e(gen 1): valid=true (latest Some(00000001))
2024-05-09T20:14:41.535284Z  INFO request{method=POST path=/upcall/v1/validate request_id=94a7be88-474e-4163-92f8-57b401473add}: Request handled, status: 200 OK
2024-05-09T20:14:46.441854Z  INFO Heartbeat round complete for 1 nodes, 0 offline
2024-05-09T20:14:51.441151Z  INFO Heartbeat round complete for 1 nodes, 0 offline
2024-05-09T20:14:56.441199Z  INFO Heartbeat round complete for 1 nodes, 0 offline
2024-05-09T20:15:01.440971Z  INFO Heartbeat round complete for 1 nodes, 0 offline
2024-05-09T20:15:03.516320Z  INFO request{method=PUT path=/control/v1/tenant/68194ffadb61ca11adcbb11cbeb4ec6e/policy request_id=0edfdb5b-2b05-486b-9879-d83f234d2f0d}: failpoint "tenant-update-policy-exclusive-lock": sleep done
2024-05-09T20:15:03.518474Z  INFO request{method=PUT path=/control/v1/tenant/68194ffadb61ca11adcbb11cbeb4ec6e/policy request_id=0edfdb5b-2b05-486b-9879-d83f234d2f0d}: Updated scheduling policy to Stop tenant_id=68194ffadb61ca11adcbb11cbeb4ec6e shard_id=0000
2024-05-09T20:15:03.518512Z  WARN request{method=PUT path=/control/v1/tenant/68194ffadb61ca11adcbb11cbeb4ec6e/policy request_id=0edfdb5b-2b05-486b-9879-d83f234d2f0d}: Scheduling is disabled by policy Stop tenant_id=68194ffadb61ca11adcbb11cbeb4ec6e shard_id=0000
2024-05-09T20:15:03.518540Z  WARN request{method=PUT path=/control/v1/tenant/68194ffadb61ca11adcbb11cbeb4ec6e/policy request_id=0edfdb5b-2b05-486b-9879-d83f234d2f0d}: Lock on UpdatePolicy was held for 31.003712703s
2024-05-09T20:15:03.518570Z  INFO request{method=PUT path=/control/v1/tenant/68194ffadb61ca11adcbb11cbeb4ec6e/policy request_id=0edfdb5b-2b05-486b-9879-d83f234d2f0d}: Request handled, status: 200 OK
2024-05-09T20:15:03.518804Z  WARN request{method=POST path=/v1/tenant/68194ffadb61ca11adcbb11cbeb4ec6e/timeline request_id=1daa8c31-522d-4805-9114-68cdcffb9823}: Scheduling is disabled by policy Stop tenant_id=68194ffadb61ca11adcbb11cbeb4ec6e shard_id=0000
2024-05-09T20:15:03.518815Z  INFO request{method=POST path=/v1/tenant/68194ffadb61ca11adcbb11cbeb4ec6e/timeline request_id=1daa8c31-522d-4805-9114-68cdcffb9823}: Creating timeline on shard 68194ffadb61ca11adcbb11cbeb4ec6e/f72185990ed13f0b0533383f81d877af, attached to node 1 (localhost)
```
we can see that the difference between starting to create timeline
`2024-05-09T20:14:33.526311Z` and creating timeline
`2024-05-09T20:15:03.518815Z` is not above 30s and will not cause any
logs to appear.

The proposed solution is to prolong how long we will pause to ensure
that the thread that creates the timeline waits above 30s.
2024-05-13 13:18:14 +01:00
Joonas Koivunen
d9dcbffac3 python: allow using allowed_errors.py (#7719)
See #7718. Fix it by renaming all `types.py` to `common_types.py`.

Additionally, add an advert for using `allowed_errors.py` to test any
added regex.
2024-05-13 15:16:23 +03:00
John Spray
f50ff14560 pageserver: refuse to run without remote storage (#7722)
## Problem

Since https://github.com/neondatabase/neon/pull/6769, the pageserver is
intentionally not usable without remote storage: it's purpose is to act
as a cache to an object store, rather than as a source of truth in its
own right.

## Summary of changes

- Make remote storage configuration mandatory: the pageserver will
refuse to start if it is not provided.

This is a precursor that will make it safe to subsequently remove all
the internal Option<>s
2024-05-13 13:05:46 +01:00
Christian Schwarz
b58a615197 chore(pageserver): plumb through RequestContext to VirtualFile read methods (#7720)
This PR introduces no functional changes.

The `open()` path will be done separately.

refs https://github.com/neondatabase/neon/issues/6107
refs https://github.com/neondatabase/neon/issues/7386
2024-05-13 09:22:10 +00:00
Joonas Koivunen
1a1d527875 test: allow vectored get validation failure during shutdown (#7716)
Per [evidence] the timeline ancestor detach tests can panic while
shutting down on vectored get validation. Allow the error because tenant
is restarted twice in the test.

[evidence]:
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-7708/9058185709/index.html#suites/a1c2be32556270764423c495fad75d47/d444f7e5c0a18ce9
2024-05-13 09:21:49 +00:00
Joonas Koivunen
216fc5ba7b test: fix confusing limit and logging (#7589)
The test has been flaky since 2024-04-11 for unknown reason, and the
logging was off. Fix the logging and raise the limit a bit. The
problematic ratio reproduces with pg14 and added sleep (not included)
but not on pg15. The new ratio abs diff limit works for all inspected
examples.

Cc: #7536
2024-05-13 11:56:07 +03:00
Joonas Koivunen
4270e86eb2 test(ancestor detach): verify with fullbackup (#7706)
In timeline detach ancestor tests there is no way to really be sure that
there were no subtle off-by one bugs. One such bug is demoed and
reverted. Add verifying fullbackup is equal before and after detaching
ancestor.

Fullbackup is expected to be equal apart from `zenith.signal`, which is
known to be good because endpoint can be started without the detached
branch receiving writes.
2024-05-13 10:58:03 +03:00
95 changed files with 458 additions and 204 deletions

View File

@@ -546,9 +546,27 @@ jobs:
# XXX: no coverage data handling here, since benchmarks are run on release builds,
# while coverage is currently collected for the debug ones
report-benchmarks-failures:
needs: [ benchmarks, create-test-report ]
if: github.ref_name == 'main' && failure()
runs-on: ubuntu-latest
steps:
- uses: slackapi/slack-github-action@v1
with:
channel-id: C060CNA47S9 # on-call-staging-storage-stream
slack-message: |
Benchmarks failed on main: ${{ github.event.head_commit.url }}
Allure report: ${{ needs.create-test-report.outputs.report-url }}
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
create-test-report:
needs: [ check-permissions, regress-tests, coverage-report, benchmarks, build-build-tools-image ]
if: ${{ !cancelled() && contains(fromJSON('["skipped", "success"]'), needs.check-permissions.result) }}
outputs:
report-url: ${{ steps.create-allure-report.outputs.report-url }}
runs-on: [ self-hosted, gen3, small ]
container:

View File

@@ -100,7 +100,7 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
async fn get_holes(path: &Utf8Path, max_holes: usize, ctx: &RequestContext) -> Result<Vec<Hole>> {
let file = VirtualFile::open(path).await?;
let file = VirtualFile::open(path, ctx).await?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader.read_blk(0, ctx).await?;

View File

@@ -61,7 +61,7 @@ async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
page_cache::init(100);
let file = VirtualFile::open(path).await?;
let file = VirtualFile::open(path, ctx).await?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader.read_blk(0, ctx).await?;

View File

@@ -383,7 +383,7 @@ fn start_pageserver(
let shutdown_pageserver = tokio_util::sync::CancellationToken::new();
// Set up remote storage client
let remote_storage = create_remote_storage_client(conf)?;
let remote_storage = Some(create_remote_storage_client(conf)?);
// Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new(
@@ -708,12 +708,11 @@ fn start_pageserver(
fn create_remote_storage_client(
conf: &'static PageServerConf,
) -> anyhow::Result<Option<GenericRemoteStorage>> {
) -> anyhow::Result<GenericRemoteStorage> {
let config = if let Some(config) = &conf.remote_storage_config {
config
} else {
tracing::warn!("no remote storage configured, this is a deprecated configuration");
return Ok(None);
anyhow::bail!("no remote storage configured, this is a deprecated configuration");
};
// Create the client
@@ -733,7 +732,7 @@ fn create_remote_storage_client(
GenericRemoteStorage::unreliable_wrapper(remote_storage, conf.test_remote_failures);
}
Ok(Some(remote_storage))
Ok(remote_storage)
}
fn cli() -> Command {

View File

@@ -1671,7 +1671,7 @@ impl<'a> DatadirModification<'a> {
}
if !self.pending_deletions.is_empty() {
writer.delete_batch(&self.pending_deletions).await?;
writer.delete_batch(&self.pending_deletions, ctx).await?;
self.pending_deletions.clear();
}

View File

@@ -299,7 +299,7 @@ mod tests {
// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = VirtualFile::create(pathbuf.as_path()).await?;
let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let (_, res) = wtr.write_blob(blob.clone(), &ctx).await;
@@ -314,7 +314,7 @@ mod tests {
wtr.flush_buffer(&ctx).await?;
}
let file = VirtualFile::open(pathbuf.as_path()).await?;
let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?;
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new(rdr);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {

View File

@@ -102,7 +102,7 @@ impl<'a> BlockReaderRef<'a> {
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
#[cfg(test)]
VirtualFile(r) => r.read_blk(blknum).await,
VirtualFile(r) => r.read_blk(blknum, ctx).await,
}
}
}
@@ -177,10 +177,11 @@ impl<'a> FileBlockReader<'a> {
&self,
buf: PageWriteGuard<'static>,
blkno: u32,
ctx: &RequestContext,
) -> Result<PageWriteGuard<'static>, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file
.read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64)
.read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64, ctx)
.await
}
/// Read a block.
@@ -206,7 +207,7 @@ impl<'a> FileBlockReader<'a> {
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::NotFound(write_guard) => {
// Read the page from disk into the buffer
let write_guard = self.fill_buffer(write_guard, blknum).await?;
let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?;
Ok(write_guard.mark_valid().into())
}
}

View File

@@ -28,6 +28,7 @@ impl EphemeralFile {
conf: &PageServerConf,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
ctx: &RequestContext,
) -> Result<EphemeralFile, io::Error> {
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
@@ -45,6 +46,7 @@ impl EphemeralFile {
.read(true)
.write(true)
.create(true),
ctx,
)
.await?;
@@ -153,7 +155,7 @@ mod tests {
async fn test_ephemeral_blobs() -> Result<(), io::Error> {
let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &ctx).await?;
let pos_foo = file.write_blob(b"foo", &ctx).await?;
assert_eq!(

View File

@@ -78,7 +78,7 @@ impl RW {
page_cache::ReadBufResult::NotFound(write_guard) => {
let write_guard = writer
.file
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64, ctx)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));

View File

@@ -112,14 +112,17 @@ pub async fn download_layer_file<'a>(
// We use fatal_err() below because the after the rename above,
// the in-memory state of the filesystem already has the layer file in its final place,
// and subsequent pageserver code could think it's durable while it really isn't.
let work = async move {
let timeline_dir = VirtualFile::open(&timeline_path)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir
.sync_all()
.await
.fatal_err("VirtualFile::sync_all timeline dir");
let work = {
let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
async move {
let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir
.sync_all()
.await
.fatal_err("VirtualFile::sync_all timeline dir");
}
};
crate::virtual_file::io_engine::get()
.spawn_blocking_and_block_on_if_std(work)
@@ -196,7 +199,7 @@ async fn download_object<'a>(
use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
use bytes::BytesMut;
async {
let destination_file = VirtualFile::create(dst_path)
let destination_file = VirtualFile::create(dst_path, ctx)
.await
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
.map_err(DownloadError::Other)?;

View File

@@ -394,6 +394,7 @@ impl DeltaLayerWriterInner {
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename. We don't know
// the end key yet, so we cannot form the final filename yet. We will
@@ -404,7 +405,7 @@ impl DeltaLayerWriterInner {
let path =
DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
let mut file = VirtualFile::create(&path).await?;
let mut file = VirtualFile::create(&path, ctx).await?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
@@ -586,6 +587,7 @@ impl DeltaLayerWriter {
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
inner: Some(
@@ -595,6 +597,7 @@ impl DeltaLayerWriter {
tenant_shard_id,
key_start,
lsn_range,
ctx,
)
.await?,
),
@@ -701,6 +704,7 @@ impl DeltaLayer {
let mut file = VirtualFile::open_with_options(
path,
virtual_file::OpenOptions::new().read(true).write(true),
ctx,
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
@@ -734,7 +738,7 @@ impl DeltaLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
let file = match VirtualFile::open(path, ctx).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
@@ -908,7 +912,7 @@ impl DeltaLayerInner {
.await
.map_err(GetVectoredError::Other)?;
self.do_reads_and_update_state(reads, reconstruct_state)
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
.await;
reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start);
@@ -1012,6 +1016,7 @@ impl DeltaLayerInner {
&self,
reads: Vec<VectoredRead>,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) {
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let mut ignore_key_with_err = None;
@@ -1029,7 +1034,7 @@ impl DeltaLayerInner {
// track when a key is done.
for read in reads.into_iter().rev() {
let res = vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"))
.read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
.await;
let blobs_buf = match res {
@@ -1274,7 +1279,7 @@ impl DeltaLayerInner {
buf.clear();
buf.reserve(read.size());
let res = reader.read_blobs(&read, buf).await?;
let res = reader.read_blobs(&read, buf, ctx).await?;
for blob in res.blobs {
let key = blob.meta.key;
@@ -1791,6 +1796,7 @@ mod test {
harness.tenant_shard_id,
entries_meta.key_range.start,
entries_meta.lsn_range.clone(),
&ctx,
)
.await?;
@@ -1848,7 +1854,7 @@ mod test {
for read in vectored_reads {
let blobs_buf = vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"))
.read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
.await?;
for meta in blobs_buf.blobs.iter() {
let value = &blobs_buf.buf[meta.start..meta.end];
@@ -1978,6 +1984,7 @@ mod test {
tenant.tenant_shard_id,
Key::MIN,
Lsn(0x11)..truncate_at,
ctx,
)
.await
.unwrap();

View File

@@ -343,6 +343,7 @@ impl ImageLayer {
let mut file = VirtualFile::open_with_options(
path,
virtual_file::OpenOptions::new().read(true).write(true),
ctx,
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
@@ -377,7 +378,7 @@ impl ImageLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
let file = match VirtualFile::open(path, ctx).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
@@ -474,7 +475,7 @@ impl ImageLayerInner {
.await
.map_err(GetVectoredError::Other)?;
self.do_reads_and_update_state(reads, reconstruct_state)
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
.await;
Ok(())
@@ -537,6 +538,7 @@ impl ImageLayerInner {
&self,
reads: Vec<VectoredRead>,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) {
let max_vectored_read_bytes = self
.max_vectored_read_bytes
@@ -565,7 +567,7 @@ impl ImageLayerInner {
}
let buf = BytesMut::with_capacity(buf_size);
let res = vectored_blob_reader.read_blobs(&read, buf).await;
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
match res {
Ok(blobs_buf) => {
@@ -631,6 +633,7 @@ impl ImageLayerWriterInner {
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename.
// We'll atomically rename it to the final name when we're done.
@@ -650,6 +653,7 @@ impl ImageLayerWriterInner {
virtual_file::OpenOptions::new()
.write(true)
.create_new(true),
ctx,
)
.await?
};
@@ -804,10 +808,11 @@ impl ImageLayerWriter {
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<ImageLayerWriter> {
Ok(Self {
inner: Some(
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn)
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
.await?,
),
})

View File

@@ -473,10 +473,11 @@ impl InMemoryLayer {
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
start_lsn: Lsn,
ctx: &RequestContext,
) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id).await?;
let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, ctx).await?;
let key = InMemoryLayerFileId(file.page_cache_file_id());
Ok(InMemoryLayer {
@@ -642,6 +643,7 @@ impl InMemoryLayer {
self.tenant_shard_id,
Key::MIN,
self.start_lsn..end_lsn,
ctx,
)
.await?;

View File

@@ -41,7 +41,7 @@ static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore
tokio::sync::Semaphore::new(permits)
});
#[derive(Debug, PartialEq, Eq, Clone, Copy, strum_macros::IntoStaticStr)]
#[derive(Debug, PartialEq, Eq, Clone, Copy, strum_macros::IntoStaticStr, enum_map::Enum)]
#[strum(serialize_all = "snake_case")]
pub(crate) enum BackgroundLoopKind {
Compaction,
@@ -57,19 +57,25 @@ pub(crate) enum BackgroundLoopKind {
impl BackgroundLoopKind {
fn as_static_str(&self) -> &'static str {
let s: &'static str = self.into();
s
self.into()
}
}
static PERMIT_GAUGES: once_cell::sync::Lazy<
enum_map::EnumMap<BackgroundLoopKind, metrics::IntCounterPair>,
> = once_cell::sync::Lazy::new(|| {
enum_map::EnumMap::from_array(std::array::from_fn(|i| {
let kind = <BackgroundLoopKind as enum_map::Enum>::from_usize(i);
crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE.with_label_values(&[kind.into()])
}))
});
/// Cancellation safe.
pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
loop_kind: BackgroundLoopKind,
_ctx: &RequestContext,
) -> tokio::sync::SemaphorePermit<'static> {
let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE
.with_label_values(&[loop_kind.as_static_str()])
.guard();
let _guard = PERMIT_GAUGES[loop_kind].guard();
pausable_failpoint!(
"initial-size-calculation-permit-pause",

View File

@@ -3560,7 +3560,11 @@ impl Timeline {
///
/// Get a handle to the latest layer for appending.
///
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
async fn get_layer_for_write(
&self,
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut guard = self.layers.write().await;
let layer = guard
.get_layer_for_write(
@@ -3569,6 +3573,7 @@ impl Timeline {
self.conf,
self.timeline_id,
self.tenant_shard_id,
ctx,
)
.await?;
Ok(layer)
@@ -3833,8 +3838,8 @@ impl Timeline {
);
self.create_delta_layer(
&frozen_layer,
ctx,
Some(metadata_keyspace.0.ranges[0].clone()),
ctx,
)
.await?
} else {
@@ -3863,7 +3868,7 @@ impl Timeline {
// Normal case, write out a L0 delta layer file.
// `create_delta_layer` will not modify the layer map.
// We will remove frozen layer and add delta layer in one atomic operation later.
let Some(layer) = self.create_delta_layer(&frozen_layer, ctx, None).await? else {
let Some(layer) = self.create_delta_layer(&frozen_layer, None, ctx).await? else {
panic!("delta layer cannot be empty if no filter is applied");
};
(
@@ -3992,8 +3997,8 @@ impl Timeline {
async fn create_delta_layer(
self: &Arc<Self>,
frozen_layer: &Arc<InMemoryLayer>,
ctx: &RequestContext,
key_range: Option<Range<Key>>,
ctx: &RequestContext,
) -> anyhow::Result<Option<ResidentLayer>> {
let self_clone = Arc::clone(self);
let frozen_layer = Arc::clone(frozen_layer);
@@ -4016,6 +4021,7 @@ impl Timeline {
&self_clone
.conf
.timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id),
&ctx,
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
@@ -4209,6 +4215,7 @@ impl Timeline {
self.tenant_shard_id,
&img_range,
lsn,
ctx,
)
.await?;
@@ -4313,6 +4320,7 @@ impl Timeline {
&self
.conf
.timeline_path(&self.tenant_shard_id, &self.timeline_id),
ctx,
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
@@ -5214,7 +5222,7 @@ impl<'a> TimelineWriter<'a> {
let buf_size: u64 = buf.len().try_into().expect("oversized value buf");
let action = self.get_open_layer_action(lsn, buf_size);
let layer = self.handle_open_layer_action(lsn, action).await?;
let layer = self.handle_open_layer_action(lsn, action, ctx).await?;
let res = layer.put_value(key, lsn, &buf, ctx).await;
if res.is_ok() {
@@ -5237,14 +5245,15 @@ impl<'a> TimelineWriter<'a> {
&mut self,
at: Lsn,
action: OpenLayerAction,
ctx: &RequestContext,
) -> anyhow::Result<&Arc<InMemoryLayer>> {
match action {
OpenLayerAction::Roll => {
let freeze_at = self.write_guard.as_ref().unwrap().max_lsn.unwrap();
self.roll_layer(freeze_at).await?;
self.open_layer(at).await?;
self.open_layer(at, ctx).await?;
}
OpenLayerAction::Open => self.open_layer(at).await?,
OpenLayerAction::Open => self.open_layer(at, ctx).await?,
OpenLayerAction::None => {
assert!(self.write_guard.is_some());
}
@@ -5253,8 +5262,8 @@ impl<'a> TimelineWriter<'a> {
Ok(&self.write_guard.as_ref().unwrap().open_layer)
}
async fn open_layer(&mut self, at: Lsn) -> anyhow::Result<()> {
let layer = self.tl.get_layer_for_write(at).await?;
async fn open_layer(&mut self, at: Lsn, ctx: &RequestContext) -> anyhow::Result<()> {
let layer = self.tl.get_layer_for_write(at, ctx).await?;
let initial_size = layer.size().await?;
let last_freeze_at = self.last_freeze_at.load();
@@ -5331,10 +5340,14 @@ impl<'a> TimelineWriter<'a> {
Ok(())
}
pub(crate) async fn delete_batch(&mut self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
pub(crate) async fn delete_batch(
&mut self,
batch: &[(Range<Key>, Lsn)],
ctx: &RequestContext,
) -> anyhow::Result<()> {
if let Some((_, lsn)) = batch.first() {
let action = self.get_open_layer_action(*lsn, 0);
let layer = self.handle_open_layer_action(*lsn, action).await?;
let layer = self.handle_open_layer_action(*lsn, action, ctx).await?;
layer.put_tombstones(batch).await?;
}

View File

@@ -700,6 +700,7 @@ impl Timeline {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
lsn_range.clone()
},
ctx,
)
.await?,
);
@@ -755,6 +756,7 @@ impl Timeline {
&self
.conf
.timeline_path(&self.tenant_shard_id, &self.timeline_id),
ctx,
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
@@ -1093,6 +1095,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
self.timeline.tenant_shard_id,
key_range.start,
lsn_range.clone(),
ctx,
)
.await?;
@@ -1167,6 +1170,7 @@ impl TimelineAdaptor {
self.timeline.tenant_shard_id,
key_range,
lsn,
ctx,
)
.await?;

View File

@@ -215,6 +215,7 @@ pub(super) async fn prepare(
&detached
.conf
.timeline_path(&detached.tenant_shard_id, &detached.timeline_id),
ctx,
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
@@ -339,6 +340,7 @@ async fn copy_lsn_prefix(
target_timeline.tenant_shard_id,
layer.layer_desc().key_range.start,
layer.layer_desc().lsn_range.start..end_lsn,
ctx,
)
.await
.map_err(CopyDeltaPrefix)?;

View File

@@ -9,6 +9,7 @@ use utils::{
use crate::{
config::PageServerConf,
context::RequestContext,
metrics::TimelineMetrics,
tenant::{
layer_map::{BatchedUpdates, LayerMap},
@@ -69,6 +70,7 @@ impl LayerManager {
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
ctx: &RequestContext,
) -> Result<Arc<InMemoryLayer>> {
ensure!(lsn.is_aligned());
@@ -105,7 +107,7 @@ impl LayerManager {
);
let new_layer =
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn).await?;
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, ctx).await?;
let layer = Arc::new(new_layer);
self.layer_map.open_layer = Some(layer.clone());

View File

@@ -23,6 +23,7 @@ use pageserver_api::key::Key;
use utils::lsn::Lsn;
use utils::vec_map::VecMap;
use crate::context::RequestContext;
use crate::virtual_file::VirtualFile;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
@@ -285,6 +286,7 @@ impl<'a> VectoredBlobReader<'a> {
&self,
read: &VectoredRead,
buf: BytesMut,
ctx: &RequestContext,
) -> Result<VectoredBlobsBuf, std::io::Error> {
assert!(read.size() > 0);
assert!(
@@ -295,7 +297,7 @@ impl<'a> VectoredBlobReader<'a> {
);
let buf = self
.file
.read_exact_at_n(buf, read.start, read.size())
.read_exact_at_n(buf, read.start, read.size(), ctx)
.await?;
let blobs_at = read.blobs_at.as_slice();

View File

@@ -344,16 +344,23 @@ macro_rules! with_file {
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true)).await
pub async fn open(
path: &Utf8Path,
ctx: &RequestContext,
) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true), ctx).await
}
/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub async fn create(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
pub async fn create(
path: &Utf8Path,
ctx: &RequestContext,
) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(
path,
OpenOptions::new().write(true).create(true).truncate(true),
ctx,
)
.await
}
@@ -366,6 +373,7 @@ impl VirtualFile {
pub async fn open_with_options(
path: &Utf8Path,
open_options: &OpenOptions,
_ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<VirtualFile, std::io::Error> {
let path_str = path.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
@@ -576,21 +584,34 @@ impl VirtualFile {
Ok(self.pos)
}
pub async fn read_exact_at<B>(&self, buf: B, offset: u64) -> Result<B, Error>
pub async fn read_exact_at<B>(
&self,
buf: B,
offset: u64,
ctx: &RequestContext,
) -> Result<B, Error>
where
B: IoBufMut + Send,
{
let (buf, res) =
read_exact_at_impl(buf, offset, None, |buf, offset| self.read_at(buf, offset)).await;
let (buf, res) = read_exact_at_impl(buf, offset, None, |buf, offset| {
self.read_at(buf, offset, ctx)
})
.await;
res.map(|()| buf)
}
pub async fn read_exact_at_n<B>(&self, buf: B, offset: u64, count: usize) -> Result<B, Error>
pub async fn read_exact_at_n<B>(
&self,
buf: B,
offset: u64,
count: usize,
ctx: &RequestContext,
) -> Result<B, Error>
where
B: IoBufMut + Send,
{
let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| {
self.read_at(buf, offset)
self.read_at(buf, offset, ctx)
})
.await;
res.map(|()| buf)
@@ -601,12 +622,13 @@ impl VirtualFile {
&self,
page: PageWriteGuard<'static>,
offset: u64,
ctx: &RequestContext,
) -> Result<PageWriteGuard<'static>, Error> {
let buf = PageWriteGuardBuf {
page,
init_up_to: 0,
};
let res = self.read_exact_at(buf, offset).await;
let res = self.read_exact_at(buf, offset, ctx).await;
res.map(|PageWriteGuardBuf { page, .. }| page)
.map_err(|e| Error::new(ErrorKind::Other, e))
}
@@ -699,7 +721,12 @@ impl VirtualFile {
(buf, Ok(n))
}
pub(crate) async fn read_at<B>(&self, buf: B, offset: u64) -> (B, Result<usize, Error>)
pub(crate) async fn read_at<B>(
&self,
buf: B,
offset: u64,
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
) -> (B, Result<usize, Error>)
where
B: tokio_epoll_uring::BoundedBufMut + Send,
{
@@ -1020,20 +1047,21 @@ impl VirtualFile {
pub(crate) async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
use crate::page_cache::PAGE_SZ;
let buf = vec![0; PAGE_SZ];
let buf = self
.read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64))
.read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64), ctx)
.await?;
Ok(crate::tenant::block_io::BlockLease::Vec(buf))
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
let mut tmp = vec![0; 128];
loop {
let res;
(tmp, res) = self.read_at(tmp, self.pos).await;
(tmp, res) = self.read_at(tmp, self.pos, ctx).await;
match res {
Ok(0) => return Ok(()),
Ok(n) => {
@@ -1159,7 +1187,6 @@ mod tests {
use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::Rng;
use std::future::Future;
use std::io::Write;
use std::os::unix::fs::FileExt;
use std::sync::Arc;
@@ -1176,9 +1203,14 @@ mod tests {
}
impl MaybeVirtualFile {
async fn read_exact_at(&self, mut buf: Vec<u8>, offset: u64) -> Result<Vec<u8>, Error> {
async fn read_exact_at(
&self,
mut buf: Vec<u8>,
offset: u64,
ctx: &RequestContext,
) -> Result<Vec<u8>, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset, ctx).await,
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
}
}
@@ -1230,13 +1262,13 @@ mod tests {
// Helper function to slurp contents of a file, starting at the current position,
// into a string
async fn read_string(&mut self) -> Result<String, Error> {
async fn read_string(&mut self, ctx: &RequestContext) -> Result<String, Error> {
use std::io::Read;
let mut buf = String::new();
match self {
MaybeVirtualFile::VirtualFile(file) => {
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
file.read_to_end(&mut buf, ctx).await?;
return Ok(String::from_utf8(buf).unwrap());
}
MaybeVirtualFile::File(file) => {
@@ -1247,9 +1279,14 @@ mod tests {
}
// Helper function to slurp a portion of a file into a string
async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
async fn read_string_at(
&mut self,
pos: u64,
len: usize,
ctx: &RequestContext,
) -> Result<String, Error> {
let buf = vec![0; len];
let buf = self.read_exact_at(buf, pos).await?;
let buf = self.read_exact_at(buf, pos, ctx).await?;
Ok(String::from_utf8(buf).unwrap())
}
}
@@ -1263,73 +1300,101 @@ mod tests {
// results with VirtualFiles as with native Files. (Except that with
// native files, you will run out of file descriptors if the ulimit
// is low enough.)
test_files("virtual_files", |path, open_options| async move {
let vf = VirtualFile::open_with_options(&path, &open_options).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
})
.await
struct A;
impl Adapter for A {
async fn open(
path: Utf8PathBuf,
opts: OpenOptions,
ctx: &RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error> {
let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
}
}
test_files::<A>("virtual_files").await
}
#[tokio::test]
async fn test_physical_files() -> anyhow::Result<()> {
test_files("physical_files", |path, open_options| async move {
Ok(MaybeVirtualFile::File({
let owned_fd = open_options.open(path.as_std_path()).await?;
File::from(owned_fd)
}))
})
.await
struct B;
impl Adapter for B {
async fn open(
path: Utf8PathBuf,
opts: OpenOptions,
_ctx: &RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error> {
Ok(MaybeVirtualFile::File({
let owned_fd = opts.open(path.as_std_path()).await?;
File::from(owned_fd)
}))
}
}
test_files::<B>("physical_files").await
}
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> anyhow::Result<()>
/// This is essentially a closure which returns a MaybeVirtualFile, but because rust edition
/// 2024 is not yet out with new lifetime capture or outlives rules, this is a async function
/// in trait which benefits from the new lifetime capture rules already.
trait Adapter {
async fn open(
path: Utf8PathBuf,
opts: OpenOptions,
ctx: &RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error>;
}
async fn test_files<A>(testname: &str) -> anyhow::Result<()>
where
OF: Fn(Utf8PathBuf, OpenOptions) -> FT,
FT: Future<Output = Result<MaybeVirtualFile, std::io::Error>>,
A: Adapter,
{
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
std::fs::create_dir_all(&testdir)?;
let path_a = testdir.join("file_a");
let mut file_a = openfunc(
let mut file_a = A::open(
path_a.clone(),
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.to_owned(),
&ctx,
)
.await?;
file_a.write_all(b"foobar".to_vec(), &ctx).await?;
// cannot read from a file opened in write-only mode
let _ = file_a.read_string().await.unwrap_err();
let _ = file_a.read_string(&ctx).await.unwrap_err();
// Close the file and re-open for reading
let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?;
let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
// cannot write to a file opened in read-only mode
let _ = file_a.write_all(b"bar".to_vec(), &ctx).await.unwrap_err();
// Try simple read
assert_eq!("foobar", file_a.read_string().await?);
assert_eq!("foobar", file_a.read_string(&ctx).await?);
// It's positioned at the EOF now.
assert_eq!("", file_a.read_string().await?);
assert_eq!("", file_a.read_string(&ctx).await?);
// Test seeks.
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
assert_eq!("oobar", file_a.read_string().await?);
assert_eq!("oobar", file_a.read_string(&ctx).await?);
assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
assert_eq!("ar", file_a.read_string().await?);
assert_eq!("ar", file_a.read_string(&ctx).await?);
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
assert_eq!("bar", file_a.read_string().await?);
assert_eq!("bar", file_a.read_string(&ctx).await?);
assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
assert_eq!("oobar", file_a.read_string().await?);
assert_eq!("oobar", file_a.read_string(&ctx).await?);
// Test erroneous seeks to before byte 0
file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
@@ -1337,11 +1402,11 @@ mod tests {
file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
// the erroneous seek should have left the position unchanged
assert_eq!("oobar", file_a.read_string().await?);
assert_eq!("oobar", file_a.read_string(&ctx).await?);
// Create another test file, and try FileExt functions on it.
let path_b = testdir.join("file_b");
let mut file_b = openfunc(
let mut file_b = A::open(
path_b.clone(),
OpenOptions::new()
.read(true)
@@ -1349,12 +1414,13 @@ mod tests {
.create(true)
.truncate(true)
.to_owned(),
&ctx,
)
.await?;
file_b.write_all_at(b"BAR".to_vec(), 3, &ctx).await?;
file_b.write_all_at(b"FOO".to_vec(), 0, &ctx).await?;
assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");
assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");
// Open a lot of files, enough to cause some evictions. (Or to be precise,
// open the same file many times. The effect is the same.)
@@ -1364,9 +1430,13 @@ mod tests {
let mut vfiles = Vec::new();
for _ in 0..100 {
let mut vfile =
openfunc(path_b.clone(), OpenOptions::new().read(true).to_owned()).await?;
assert_eq!("FOOBAR", vfile.read_string().await?);
let mut vfile = A::open(
path_b.clone(),
OpenOptions::new().read(true).to_owned(),
&ctx,
)
.await?;
assert_eq!("FOOBAR", vfile.read_string(&ctx).await?);
vfiles.push(vfile);
}
@@ -1375,13 +1445,13 @@ mod tests {
// The underlying file descriptor for 'file_a' should be closed now. Try to read
// from it again. We left the file positioned at offset 1 above.
assert_eq!("oobar", file_a.read_string().await?);
assert_eq!("oobar", file_a.read_string(&ctx).await?);
// Check that all the other FDs still work too. Use them in random order for
// good measure.
vfiles.as_mut_slice().shuffle(&mut thread_rng());
for vfile in vfiles.iter_mut() {
assert_eq!("OOBAR", vfile.read_string_at(1, 5).await?);
assert_eq!("OOBAR", vfile.read_string_at(1, 5, &ctx).await?);
}
Ok(())
@@ -1397,6 +1467,7 @@ mod tests {
const THREADS: usize = 100;
const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
std::fs::create_dir_all(&testdir)?;
@@ -1410,8 +1481,12 @@ mod tests {
// Open the file many times.
let mut files = Vec::new();
for _ in 0..VIRTUAL_FILES {
let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))
.await?;
let f = VirtualFile::open_with_options(
&test_file_path,
OpenOptions::new().read(true),
&ctx,
)
.await?;
files.push(f);
}
let files = Arc::new(files);
@@ -1425,12 +1500,13 @@ mod tests {
let mut hdls = Vec::new();
for _threadno in 0..THREADS {
let files = files.clone();
let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
let hdl = rt.spawn(async move {
let mut buf = vec![0u8; SIZE];
let mut rng = rand::rngs::OsRng;
for _ in 1..1000 {
let f = &files[rng.gen_range(0..files.len())];
buf = f.read_exact_at(buf, 0).await.unwrap();
buf = f.read_exact_at(buf, 0, &ctx).await.unwrap();
assert!(buf == SAMPLE);
}
});
@@ -1446,6 +1522,7 @@ mod tests {
#[tokio::test]
async fn test_atomic_overwrite_basic() {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
std::fs::create_dir_all(&testdir).unwrap();
@@ -1455,8 +1532,8 @@ mod tests {
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
let post = file.read_string().await.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string(&ctx).await.unwrap();
assert_eq!(post, "foo");
assert!(!tmp_path.exists());
drop(file);
@@ -1464,8 +1541,8 @@ mod tests {
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
let post = file.read_string().await.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string(&ctx).await.unwrap();
assert_eq!(post, "bar");
assert!(!tmp_path.exists());
drop(file);
@@ -1473,6 +1550,7 @@ mod tests {
#[tokio::test]
async fn test_atomic_overwrite_preexisting_tmp() {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let testdir =
crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
std::fs::create_dir_all(&testdir).unwrap();
@@ -1487,8 +1565,8 @@ mod tests {
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
let post = file.read_string().await.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string(&ctx).await.unwrap();
assert_eq!(post, "foo");
assert!(!tmp_path.exists());
drop(file);

View File

@@ -49,7 +49,7 @@ pub mod defaults {
pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "3h";
}
#[derive(Debug, Clone)]

View File

@@ -1,6 +1,6 @@
//! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
//! and `flush_lsn` updates. After the partial segment was updated (`flush_lsn`
//! was changed), the segment will be uploaded to S3 in about 15 minutes.
//! was changed), the segment will be uploaded to S3 in about 3 hours.
//!
//! The filename format for partial segments is
//! `Segment_Term_Flush_Commit_skNN.partial`, where:

18
scripts/check_allowed_errors.sh Executable file
View File

@@ -0,0 +1,18 @@
#!/usr/bin/env bash
set -eu
HELPER_DIR="$(dirname "${BASH_SOURCE[0]}")"
SCRIPT="test_runner/fixtures/pageserver/allowed_errors.py"
# first run to understand all of the errors:
#
# example: ./scripts/check_allowed_errors.sh -i - < pageserver.log
# example: ./scripts/check_allowed_errors.sh -i pageserver.log
#
# then edit the test local allowed_errors to the
# test_runner/fixtures/pageserver/allowed_errors.py, then re-run to make sure
# they are handled.
#
# finally revert any local changes to allowed_errors.py.
poetry run python3 "$HELPER_DIR/../$SCRIPT" $*

View File

@@ -19,9 +19,9 @@ from _pytest.config.argparsing import Parser
from _pytest.fixtures import FixtureRequest
from _pytest.terminal import TerminalReporter
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonPageserver
from fixtures.types import TenantId, TimelineId
"""
This file contains fixtures for micro-benchmarks.

View File

@@ -5,8 +5,8 @@ import pytest
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
from fixtures.common_types import TenantId
from fixtures.log_helper import log
from fixtures.types import TenantId
class ComputeReconfigure:

View File

@@ -47,14 +47,15 @@ from urllib3.util.retry import Retry
from fixtures import overlayfs
from fixtures.broker import NeonBroker
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.pageserver.allowed_errors import (
DEFAULT_PAGESERVER_ALLOWED_ERRORS,
DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS,
)
from fixtures.pageserver.common_types import IndexPartDump, LayerName, parse_layer_file_name
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.types import IndexPartDump, LayerName, parse_layer_file_name
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload,
@@ -72,7 +73,6 @@ from fixtures.remote_storage import (
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.utils import are_walreceivers_absent
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.utils import (
ATTACHMENT_NAME_REGEX,
allure_add_grafana_links,

View File

@@ -131,9 +131,10 @@ if __name__ == "__main__":
"-i",
"--input",
type=argparse.FileType("r"),
default=sys.stdin,
help="Pageserver logs file. Reads from stdin if no file is provided.",
help="Pageserver logs file. Use '-' for stdin.",
required=True,
)
args = parser.parse_args()
errors = _check_allowed_errors(args.input)

View File

@@ -2,7 +2,7 @@ import re
from dataclasses import dataclass
from typing import Any, Dict, Tuple, Union
from fixtures.types import KEY_MAX, KEY_MIN, Key, Lsn
from fixtures.common_types import KEY_MAX, KEY_MIN, Key, Lsn
@dataclass

View File

@@ -11,10 +11,10 @@ import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.pg_version import PgVersion
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.utils import Fn

View File

@@ -3,6 +3,7 @@ import time
from typing import Any, Callable, Dict, Tuple
import fixtures.pageserver.remote_storage
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
@@ -12,7 +13,6 @@ from fixtures.pageserver.utils import (
wait_until_tenant_state,
)
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.types import TenantId, TimelineId
def single_timeline(

View File

@@ -6,13 +6,13 @@ import threading
from pathlib import Path
from typing import Any, List, Tuple
from fixtures.common_types import TenantId, TimelineId
from fixtures.neon_fixtures import NeonEnv, Pagectl
from fixtures.pageserver.types import (
from fixtures.pageserver.common_types import (
InvalidFileName,
parse_layer_file_name,
)
from fixtures.remote_storage import LocalFsStorage
from fixtures.types import TenantId, TimelineId
def duplicate_one_tenant(env: NeonEnv, template_tenant: TenantId, new_tenant: TenantId):

View File

@@ -8,10 +8,10 @@ from mypy_boto3_s3.type_defs import (
ObjectTypeDef,
)
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.remote_storage import RemoteStorage, RemoteStorageKind, S3Storage
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.utils import wait_until

View File

@@ -12,8 +12,8 @@ import boto3
import toml
from mypy_boto3_s3 import S3Client
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.types import TenantId, TimelineId
TIMELINE_INDEX_PART_FILE_NAME = "index_part.json"
TENANT_HEATMAP_FILE_NAME = "heatmap-v1.json"

View File

@@ -6,8 +6,8 @@ from typing import Any, Dict, List, Optional, Tuple, Union
import pytest
import requests
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.types import Lsn, TenantId, TimelineId
# Walreceiver as returned by sk's timeline status endpoint.

View File

@@ -1,6 +1,6 @@
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.types import TenantId, TimelineId
def are_walreceivers_absent(

View File

@@ -25,14 +25,14 @@ import zstandard
from psycopg2.extensions import cursor
from fixtures.log_helper import log
from fixtures.pageserver.types import (
from fixtures.pageserver.common_types import (
parse_delta_layer,
parse_image_layer,
)
if TYPE_CHECKING:
from fixtures.neon_fixtures import PgBin
from fixtures.types import TimelineId
from fixtures.common_types import TimelineId
Fn = TypeVar("Fn", bound=Callable[..., Any])
@@ -452,6 +452,7 @@ def humantime_to_ms(humantime: str) -> float:
def scan_log_for_errors(input: Iterable[str], allowed_errors: List[str]) -> List[Tuple[int, str]]:
# FIXME: this duplicates test_runner/fixtures/pageserver/allowed_errors.py
error_or_warn = re.compile(r"\s(ERROR|WARN)")
errors = []
for lineno, line in enumerate(input, start=1):
@@ -484,7 +485,7 @@ def assert_no_errors(log_file, service, allowed_errors):
for _lineno, error in errors:
log.info(f"not allowed {service} error: {error.strip()}")
assert not errors, f"Log errors on {service}: {errors[0]}"
assert not errors, f"First log error on {service}: {errors[0]}\nHint: use scripts/check_allowed_errors.sh to test any new allowed_error you add"
@enum.unique

View File

@@ -1,6 +1,7 @@
import threading
from typing import Any, Optional
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
@@ -10,7 +11,6 @@ from fixtures.neon_fixtures import (
wait_for_last_flush_lsn,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import TenantId, TimelineId
# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex
# to ensure we don't do that: this enables running lots of Workloads in parallel safely.

View File

@@ -5,13 +5,13 @@ Utilities used by all code in this sub-directory
from typing import Any, Callable, Dict, Tuple
import fixtures.pageserver.many_tenants as many_tenants
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
)
from fixtures.pageserver.utils import wait_until_all_tenants_state
from fixtures.types import TenantId, TimelineId
def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):

View File

@@ -9,11 +9,11 @@ from typing import List
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.common_types import Lsn
from fixtures.compare_fixtures import NeonCompare
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonPageserver
from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.types import Lsn
from fixtures.utils import wait_until
from prometheus_client.samples import Sample

View File

@@ -2,10 +2,10 @@ from contextlib import closing
import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.common_types import Lsn
from fixtures.compare_fixtures import NeonCompare, PgCompare
from fixtures.pageserver.utils import wait_tenant_status_404
from fixtures.pg_version import PgVersion
from fixtures.types import Lsn
#

View File

@@ -3,6 +3,7 @@ import random
import time
import pytest
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
@@ -10,7 +11,6 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pg_version import PgVersion
from fixtures.types import TenantId, TenantShardId, TimelineId
@pytest.mark.timeout(3600) # super long running test: should go down as we optimize

View File

@@ -6,10 +6,10 @@ from typing import Any, Callable, List
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.common_types import Lsn
from fixtures.compare_fixtures import NeonCompare, PgCompare, VanillaCompare
from fixtures.log_helper import log
from fixtures.neon_fixtures import DEFAULT_BRANCH_NAME, NeonEnvBuilder, PgBin
from fixtures.types import Lsn
from performance.test_perf_pgbench import get_durations_matrix, get_scales_matrix

View File

@@ -1,6 +1,6 @@
from fixtures.common_types import TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.types import TimelineId
from fixtures.utils import query_scalar

View File

@@ -2,13 +2,13 @@ from dataclasses import dataclass
from typing import Generator, Optional
import pytest
from fixtures.common_types import TenantId
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
)
from fixtures.pageserver.http import PageserverApiException, TenantConfig
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.types import TenantId
from fixtures.utils import wait_until

View File

@@ -4,13 +4,13 @@ from pathlib import Path
import psycopg2
import pytest
from fixtures.common_types import TenantId, TimelineId
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgProtocol,
)
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.types import TenantId, TimelineId
def assert_client_authorized(env: NeonEnv, http_client: PageserverHttpClient):

View File

@@ -2,10 +2,10 @@ import threading
import time
import pytest
from fixtures.common_types import Lsn, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.pageserver.http import TimelineCreate406
from fixtures.types import Lsn, TimelineId
from fixtures.utils import query_scalar

View File

@@ -1,8 +1,8 @@
import pytest
from fixtures.common_types import Lsn, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.pageserver.http import TimelineCreate406
from fixtures.types import Lsn, TimelineId
from fixtures.utils import print_gc_result, query_scalar

View File

@@ -5,6 +5,7 @@ from concurrent.futures import ThreadPoolExecutor
from typing import List
import pytest
from fixtures.common_types import Lsn, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
@@ -14,7 +15,6 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import wait_until_tenant_active
from fixtures.types import Lsn, TimelineId
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
from requests import RequestException

View File

@@ -3,6 +3,7 @@ import os
from typing import List, Tuple
import pytest
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
@@ -11,7 +12,6 @@ from fixtures.neon_fixtures import (
wait_for_last_flush_lsn,
)
from fixtures.pg_version import PgVersion
from fixtures.types import TenantId, TimelineId
# Test restarting page server, while safekeeper and compute node keep

View File

@@ -7,6 +7,7 @@ from typing import List, Optional
import pytest
import toml
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
@@ -21,7 +22,6 @@ from fixtures.pageserver.utils import (
)
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import Lsn
#
# A test suite that help to prevent unintentionally breaking backward or forward compatibility between Neon releases.

View File

@@ -5,6 +5,7 @@ from dataclasses import dataclass
from typing import Any, Dict, Iterable, Tuple
import pytest
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
@@ -16,7 +17,6 @@ from fixtures.neon_fixtures import (
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_upload_queue_empty
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
GLOBAL_LRU_LOG_LINE = "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy"
@@ -623,15 +623,16 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder):
ratio = count_now / original_count
abs_diff = abs(ratio - expected_ratio)
assert original_count > count_now
log.info(
f"tenant {tenant_id} layer count {original_count} -> {count_now}, ratio: {ratio}, expecting {abs_diff} < 0.1"
)
expectation = 0.06
log.info(
f"tenant {tenant_id} layer count {original_count} -> {count_now}, ratio: {ratio}, expecting {abs_diff} < {expectation}"
)
# in this test case both relative_spare and relative_equal produce
# the same outcomes; this must be a quantization effect of similar
# sizes (-s4 and -s6) and small (5MB) layer size.
# for pg15 and pg16 the absdiff is < 0.01, for pg14 it is closer to 0.02
assert abs_diff < 0.05
assert abs_diff < expectation
@pytest.mark.parametrize(

View File

@@ -2,7 +2,7 @@ import time
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.pageserver.types import parse_layer_file_name
from fixtures.pageserver.common_types import parse_layer_file_name
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload_queue_empty,

View File

@@ -1,6 +1,7 @@
import os
from pathlib import Path
from fixtures.common_types import Lsn, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
@@ -8,7 +9,6 @@ from fixtures.neon_fixtures import (
VanillaPostgres,
)
from fixtures.port_distributor import PortDistributor
from fixtures.types import Lsn, TimelineId
from fixtures.utils import query_scalar, subprocess_capture
num_rows = 1000

View File

@@ -2,6 +2,7 @@ import asyncio
import concurrent.futures
import random
from fixtures.common_types import TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
@@ -10,7 +11,6 @@ from fixtures.neon_fixtures import (
wait_for_last_flush_lsn,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TimelineId
# Test configuration
#

View File

@@ -7,6 +7,7 @@ from contextlib import closing
from pathlib import Path
import pytest
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
@@ -20,7 +21,6 @@ from fixtures.pageserver.utils import (
wait_for_upload,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import subprocess_capture

View File

@@ -7,7 +7,7 @@ from fixtures.neon_fixtures import (
flush_ep_to_pageserver,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.types import parse_layer_file_name
from fixtures.pageserver.common_types import parse_layer_file_name
from fixtures.pageserver.utils import wait_for_upload
from fixtures.remote_storage import RemoteStorageKind

View File

@@ -1,8 +1,9 @@
import time
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver
from fixtures.pageserver.types import (
from fixtures.pageserver.common_types import (
DeltaLayerName,
ImageLayerName,
is_future_layer,
@@ -13,7 +14,6 @@ from fixtures.pageserver.utils import (
wait_until_tenant_active,
)
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.types import Lsn
from fixtures.utils import query_scalar, wait_until

View File

@@ -4,6 +4,7 @@ from random import choice
from string import ascii_lowercase
import pytest
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
AuxFileStore,
@@ -12,7 +13,6 @@ from fixtures.neon_fixtures import (
logical_replication_sync,
wait_for_last_flush_lsn,
)
from fixtures.types import Lsn
from fixtures.utils import query_scalar, wait_until

View File

@@ -2,10 +2,10 @@ import re
import time
from datetime import datetime, timedelta, timezone
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_last_flush_lsn
from fixtures.pageserver.http import PageserverApiException
from fixtures.types import Lsn
from fixtures.utils import query_scalar

View File

@@ -5,6 +5,7 @@ from typing import cast
import pytest
import requests
from fixtures.common_types import TenantId, TimelineId
from fixtures.neon_fixtures import (
DEFAULT_BRANCH_NAME,
NeonEnv,
@@ -13,7 +14,6 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pg_version import PgVersion, skip_on_postgres
from fixtures.types import TenantId, TimelineId
def helper_compare_timeline_list(

View File

@@ -3,13 +3,13 @@ import os
import time
from pathlib import Path
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_wal_insert_lsn
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar

View File

@@ -1,6 +1,6 @@
from fixtures.common_types import TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.types import TimelineId
from fixtures.utils import print_gc_result, query_scalar

View File

@@ -5,6 +5,7 @@ import time
from collections import defaultdict
from typing import Any, DefaultDict, Dict, Tuple
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
@@ -21,7 +22,6 @@ from fixtures.pageserver.utils import (
wait_until_tenant_active,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import Lsn
from fixtures.utils import query_scalar, wait_until

View File

@@ -1,9 +1,9 @@
from typing import Optional
import pytest
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, tenant_get_shards
from fixtures.types import Lsn
from fixtures.utils import query_scalar

View File

@@ -3,13 +3,13 @@ from pathlib import Path
from typing import Optional
import toml
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.neon_fixtures import (
DEFAULT_BRANCH_NAME,
NeonEnv,
NeonEnvBuilder,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until

View File

@@ -16,6 +16,7 @@ import time
from typing import Optional
import pytest
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
@@ -37,7 +38,6 @@ from fixtures.pageserver.utils import (
from fixtures.remote_storage import (
RemoteStorageKind,
)
from fixtures.types import TenantId, TimelineId
from fixtures.utils import print_gc_result, wait_until
from fixtures.workload import Workload

View File

@@ -2,10 +2,10 @@ import json
import uuid
from anyio import Path
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
from fixtures.pg_version import PgVersion
from fixtures.types import TenantId, TimelineId
from fixtures.utils import wait_until

View File

@@ -5,6 +5,7 @@ from typing import Optional, Tuple
import psutil
import pytest
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
@@ -13,7 +14,6 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
TIMELINE_COUNT = 10

View File

@@ -7,6 +7,7 @@ from pathlib import Path
from queue import SimpleQueue
from typing import Any, Dict, Set
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
@@ -17,7 +18,6 @@ from fixtures.remote_storage import (
RemoteStorageKind,
remote_storage_to_toml_inline_table,
)
from fixtures.types import TenantId, TimelineId
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response

View File

@@ -5,9 +5,10 @@ import time
from typing import Any, Dict, Optional
import pytest
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver, S3Scrubber
from fixtures.pageserver.types import parse_layer_file_name
from fixtures.pageserver.common_types import parse_layer_file_name
from fixtures.pageserver.utils import (
assert_prefix_empty,
poll_for_remote_storage_iterations,
@@ -15,7 +16,6 @@ from fixtures.pageserver.utils import (
wait_for_upload_queue_empty,
)
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, S3Storage
from fixtures.types import TenantId, TimelineId
from fixtures.utils import wait_until
from fixtures.workload import Workload

View File

@@ -1,6 +1,6 @@
from fixtures.common_types import TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.types import TimelineId
from fixtures.utils import print_gc_result, query_scalar

View File

@@ -1,8 +1,8 @@
from contextlib import closing
from fixtures.common_types import Lsn
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.types import Lsn
from fixtures.utils import query_scalar

View File

@@ -1,8 +1,8 @@
import pytest
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.types import Lsn
from fixtures.utils import query_scalar

View File

@@ -6,13 +6,14 @@ import time
from typing import Dict, List, Optional, Tuple
import pytest
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.common_types import parse_layer_file_name
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pageserver.types import parse_layer_file_name
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
wait_for_last_record_lsn,
@@ -25,7 +26,6 @@ from fixtures.remote_storage import (
RemoteStorageKind,
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import (
assert_eq,
assert_ge,

View File

@@ -1,6 +1,7 @@
import time
from datetime import datetime, timezone
from fixtures.common_types import Lsn
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
@@ -14,7 +15,6 @@ from fixtures.pageserver.utils import (
wait_for_upload,
)
from fixtures.remote_storage import RemoteStorageKind, s3_storage
from fixtures.types import Lsn
from fixtures.utils import run_pg_bench_small

View File

@@ -3,12 +3,12 @@ import shutil
from typing import Optional
import pytest
from fixtures.common_types import TenantShardId
from fixtures.neon_fixtures import (
NeonEnvBuilder,
S3Scrubber,
)
from fixtures.remote_storage import S3Storage, s3_storage
from fixtures.types import TenantShardId
from fixtures.workload import Workload

View File

@@ -5,6 +5,7 @@ from typing import Dict, List, Optional, Union
import pytest
import requests
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
@@ -18,7 +19,6 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty
from fixtures.remote_storage import s3_storage
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.utils import wait_until
from fixtures.workload import Workload
from pytest_httpserver import HTTPServer

View File

@@ -6,6 +6,7 @@ from datetime import datetime, timezone
from typing import Any, Dict, List, Union
import pytest
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
@@ -25,7 +26,6 @@ from fixtures.pageserver.utils import (
)
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import RemoteStorageKind, s3_storage
from fixtures.types import TenantId, TenantShardId, TimelineId
from fixtures.utils import run_pg_bench_small, subprocess_capture, wait_until
from fixtures.workload import Workload
from mypy_boto3_s3.type_defs import (
@@ -1284,7 +1284,7 @@ def test_lock_time_tracing(neon_env_builder: NeonEnvBuilder):
# Apply failpoint
env.storage_controller.configure_failpoints(
("tenant-update-policy-exclusive-lock", "return(31000)")
("tenant-update-policy-exclusive-lock", "return(35000)")
)
# This will hold the exclusive for enough time to cause an warning
@@ -1306,7 +1306,7 @@ def test_lock_time_tracing(neon_env_builder: NeonEnvBuilder):
env.storage_controller.pageserver_api().timeline_create(
pg_version=PgVersion.NOT_SET, tenant_id=tenant_id, new_timeline_id=timeline_id
)
thread_update_tenant_policy.join(timeout=10)
thread_update_tenant_policy.join()
env.storage_controller.assert_log_contains("Lock on UpdatePolicy was held for")
env.storage_controller.assert_log_contains(

View File

@@ -2,13 +2,13 @@ import json
from contextlib import closing
import psycopg2.extras
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.pageserver.utils import assert_tenant_state, wait_for_upload
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.types import Lsn
from fixtures.utils import wait_until

View File

@@ -5,6 +5,7 @@ import shutil
from threading import Thread
import pytest
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
@@ -26,7 +27,6 @@ from fixtures.pageserver.utils import (
wait_until_tenant_state,
)
from fixtures.remote_storage import RemoteStorageKind, available_s3_storages, s3_storage
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import run_pg_bench_small, wait_until
from requests.exceptions import ReadTimeout

View File

@@ -7,6 +7,7 @@ from typing import List, Optional
import asyncpg
import pytest
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
@@ -22,7 +23,6 @@ from fixtures.pageserver.utils import (
from fixtures.remote_storage import (
RemoteStorageKind,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, wait_until
from prometheus_client.samples import Sample

View File

@@ -7,6 +7,7 @@ from pathlib import Path
from typing import Any, Dict, Optional, Tuple
import pytest
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnvBuilder, NeonPageserver
from fixtures.pageserver.http import PageserverHttpClient
@@ -20,7 +21,6 @@ from fixtures.remote_storage import (
LocalFsStorage,
RemoteStorageKind,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import (
query_scalar,
wait_until,

View File

@@ -4,6 +4,7 @@ from pathlib import Path
from typing import List, Tuple
import pytest
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
@@ -19,7 +20,6 @@ from fixtures.pageserver.utils import (
wait_until_tenant_active,
)
from fixtures.pg_version import PgVersion
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until

View File

@@ -1,3 +1,4 @@
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.pageserver.utils import (
@@ -5,7 +6,6 @@ from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
wait_until_tenant_active,
)
from fixtures.types import TenantId, TimelineId
from fixtures.utils import wait_until

View File

@@ -9,6 +9,7 @@ from typing import List
import pytest
import requests
from fixtures.common_types import Lsn, TenantId
from fixtures.log_helper import log
from fixtures.metrics import (
PAGESERVER_GLOBAL_METRICS,
@@ -24,7 +25,6 @@ from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import timeline_delete_wait_completed, wait_until_tenant_active
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import Lsn, TenantId
from fixtures.utils import wait_until
from prometheus_client.samples import Sample

View File

@@ -11,6 +11,7 @@ import os
from pathlib import Path
from typing import List, Tuple
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
@@ -18,7 +19,7 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
last_flush_lsn_upload,
)
from fixtures.pageserver.types import parse_layer_file_name
from fixtures.pageserver.common_types import parse_layer_file_name
from fixtures.pageserver.utils import (
assert_tenant_state,
wait_for_last_record_lsn,
@@ -28,7 +29,6 @@ from fixtures.remote_storage import (
LocalFsStorage,
RemoteStorageKind,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, wait_until

View File

@@ -6,6 +6,7 @@ import threading
import pytest
import requests
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
@@ -31,7 +32,6 @@ from fixtures.remote_storage import (
RemoteStorageKind,
s3_storage,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, run_pg_bench_small, wait_until
from urllib3.util.retry import Retry

View File

@@ -1,20 +1,25 @@
import datetime
import enum
import tarfile
import time
from concurrent.futures import ThreadPoolExecutor
from hashlib import sha256
from pathlib import Path
from queue import Empty, Queue
from threading import Barrier
from typing import List
from typing import IO, List, Set, Tuple, Union
import pytest
from fixtures.common_types import Lsn, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import HistoricLayerInfo
from fixtures.pageserver.utils import wait_timeline_detail_404
from fixtures.remote_storage import LocalFsStorage
from fixtures.types import Lsn, TimelineId
def by_end_lsn(info: HistoricLayerInfo) -> Lsn:
@@ -53,6 +58,8 @@ class Branchpoint(str, enum.Enum):
SHUTDOWN_ALLOWED_ERRORS = [
".*initial size calculation failed: downloading failed, possibly for shutdown",
".*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited",
".*logical_size_calculation_task:panic.*: Sequential get failed with Bad state \\(not active\\).*",
".*Task 'initial size calculation' .* panicked.*",
]
@@ -60,7 +67,10 @@ SHUTDOWN_ALLOWED_ERRORS = [
@pytest.mark.parametrize("restart_after", [True, False])
@pytest.mark.parametrize("write_to_branch_first", [True, False])
def test_ancestor_detach_branched_from(
test_output_dir,
pg_distrib_dir,
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
branchpoint: Branchpoint,
restart_after: bool,
write_to_branch_first: bool,
@@ -70,6 +80,7 @@ def test_ancestor_detach_branched_from(
"""
env = neon_env_builder.init_start()
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
client = env.pageserver.http_client()
@@ -146,6 +157,20 @@ def test_ancestor_detach_branched_from(
else:
branch_layers = set()
# run fullbackup to make sure there are no off by one errors
# take this on the parent
fullbackup_before = test_output_dir / "fullbackup-before.tar"
cmd = [
"psql",
"--no-psqlrc",
env.pageserver.connstr(),
"-c",
f"fullbackup {env.initial_tenant} {env.initial_timeline} {branch_at}",
"-o",
str(fullbackup_before),
]
pg_bin.run_capture(cmd, env=psql_env)
all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert all_reparented == set()
@@ -173,9 +198,73 @@ def test_ancestor_detach_branched_from(
# but if nothing was copied, then there is no nice rule.
# there could be a hole in LSNs between copied from the "old main" and the first branch layer.
# take this on the detached, at same lsn
fullbackup_after = test_output_dir / "fullbackup-after.tar"
cmd = [
"psql",
"--no-psqlrc",
env.pageserver.connstr(),
"-c",
f"fullbackup {env.initial_tenant} {timeline_id} {branch_at}",
"-o",
str(fullbackup_after),
]
pg_bin.run_capture(cmd, env=psql_env)
client.timeline_delete(env.initial_tenant, env.initial_timeline)
wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline, 10, 1.0)
# because we do the fullbackup from ancestor at the branch_lsn, the zenith.signal is always different
# as there is always "PREV_LSN: invalid" for "before"
skip_files = {"zenith.signal"}
tar_cmp(fullbackup_before, fullbackup_after, skip_files)
def tar_cmp(left: Path, right: Path, skip_files: Set[str]):
"""
This is essentially:
lines=$(comm -3 \
<(mkdir left && cd left && tar xf "$left" && find . -type f -print0 | xargs sha256sum | sort -k2) \
<(mkdir right && cd right && tar xf "$right" && find . -type f -print0 | xargs sha256sum | sort -k2) \
| wc -l)
[ "$lines" = "0" ]
But in a more mac friendly fashion.
"""
started_at = time.time()
def hash_extracted(reader: Union[IO[bytes], None]) -> bytes:
assert reader is not None
digest = sha256(usedforsecurity=False)
while True:
buf = reader.read(64 * 1024)
if not buf:
break
digest.update(buf)
return digest.digest()
def build_hash_list(p: Path) -> List[Tuple[str, bytes]]:
with tarfile.open(p) as f:
matching_files = (info for info in f if info.isreg() and info.name not in skip_files)
ret = list(
map(lambda info: (info.name, hash_extracted(f.extractfile(info))), matching_files)
)
ret.sort(key=lambda t: t[0])
return ret
left_list, right_list = map(build_hash_list, [left, right])
try:
assert len(left_list) == len(right_list)
for left_tuple, right_tuple in zip(left_list, right_list):
assert left_tuple == right_tuple
finally:
elapsed = time.time() - started_at
log.info(f"tar_cmp completed in {elapsed}s")
def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder):
"""

View File

@@ -10,6 +10,7 @@ from typing import Optional
import psycopg2.errors
import psycopg2.extras
import pytest
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
@@ -31,7 +32,6 @@ from fixtures.pageserver.utils import (
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TenantId, TimelineId
from fixtures.utils import get_timeline_dir_size, wait_until

View File

@@ -18,6 +18,7 @@ import psycopg2.errors
import psycopg2.extras
import pytest
from fixtures.broker import NeonBroker
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import (
@@ -47,7 +48,6 @@ from fixtures.remote_storage import (
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.utils import are_walreceivers_absent
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import get_dir_size, query_scalar, start_in_background

View File

@@ -8,10 +8,10 @@ from typing import List, Optional
import asyncpg
import pytest
import toml
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import getLogger
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import Lsn, TenantId, TimelineId
log = getLogger("root.safekeeper_async")

View File

@@ -1,8 +1,8 @@
import time
from fixtures.common_types import Lsn, TenantId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.types import Lsn, TenantId
# Checks that pageserver's walreceiver state is printed in the logs during WAL wait timeout.

View File

@@ -6,6 +6,7 @@ from typing import List
import pytest
import zstandard
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
@@ -19,7 +20,6 @@ from fixtures.pageserver.utils import (
)
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import LocalFsStorage, S3Storage, s3_storage
from fixtures.types import Lsn, TenantId, TimelineId
from mypy_boto3_s3.type_defs import (
ObjectTypeDef,
)

View File

@@ -2,10 +2,10 @@ import time
import psutil
import pytest
from fixtures.common_types import TenantId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.pageserver.http import PageserverApiException
from fixtures.types import TenantId
def assert_child_processes(pageserver_pid, wal_redo_present=False, defunct_present=False):