Compare commits

..

8 Commits

Author SHA1 Message Date
Alex Chi Z
da8fa73b1b allow anyone reset pg_stat_statements
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-08-29 16:16:18 +02:00
Joonas Koivunen
d1fcdf75b3 test: enhanced logging for curious mock_s3 (#5134)
Possible flakyness with mock_s3. Add logging in hopes this will happen
again.

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2023-08-29 14:48:50 +03:00
Alexander Bayandin
7e39a96441 scripts/flaky_tests.py: Improve flaky tests detection (#5094)
## Problem

We still need to rerun some builds manually because flaky tests weren't
detected automatically.
I found two reasons for it:
- If a test is flaky on a particular build type, on a particular
Postgres version, there's a high chance that this test is flaky on all
configurations, but we don't automatically detect such cases.
- We detect flaky tests only on the main branch, which requires manual
retrigger runs for freshly made flaky tests.
Both of them are fixed in the PR.

## Summary of changes
- Spread flakiness of a single test to all configurations
- Detect flaky tests in all branches (not only in the main)
- Look back only at  7 days of test history (instead of 10)
2023-08-29 11:53:24 +01:00
Vadim Kharitonov
babefdd3f9 Upgrade pgvector to 0.5.0 (#5132) 2023-08-29 12:53:50 +03:00
Arpad Müller
805fee1483 page cache: small code cleanups (#5125)
## Problem

I saw these things while working on #5111.

## Summary of changes

* Add a comment explaining why we use `Vec::leak` instead of
`Vec::into_boxed_slice` plus `Box::leak`.
* Add another comment explaining what `valid` is doing, it wasn't very
clear before.
* Add a function `set_usage_count` to not set it directly.
2023-08-29 11:49:04 +03:00
Felix Prasanna
85d6d9dc85 monitor/compute_ctl: remove references to the informant (#5115)
Also added some docs to the monitor :)

Co-authored-by: Em Sharnoff <sharnoff@neon.tech>
2023-08-29 02:59:27 +03:00
Em Sharnoff
e40ee7c3d1 remove unused file 'vm-cgconfig.conf' (#5127)
Honestly no clue why it's still here, should have been removed ages ago.
This is handled by vm-builder now.
2023-08-28 13:04:57 -07:00
Christian Schwarz
0fe3b3646a page cache: don't proactively evict EphemeralFile pages (#5129)
Before this patch, when dropping an EphemeralFile, we'd scan the entire
`slots` to proactively evict its pages (`drop_buffers_for_immutable`).

This was _necessary_ before #4994 because the page cache was a
write-back cache: we'd be deleting the EphemeralFile from disk after,
so, if we hadn't evicted its pages before that, write-back in
`find_victim` wouldhave failed.

But, since #4994, the page cache is a read-only cache, so, it's safe
to keep read-only data cached. It's never going to get accessed again
and eventually, `find_victim` will evict it.

The only remaining advantage of `drop_buffers_for_immutable` over
relying on `find_victim` is that `find_victim` has to do the clock
page replacement iterations until the count reaches 0,
whereas `drop_buffers_for_immutable` can kick the page out right away.

However, weigh that against the cost of `drop_buffers_for_immutable`,
which currently scans the entire `slots` array to find the
EphemeralFile's pages.

Alternatives have been proposed in #5122 and #5128, but, they come
with their own overheads & trade-offs.

Also, the real reason why we're looking into this piece of code is
that we want to make the slots rwlock async in #5023.
Since `drop_buffers_for_immutable` is called from drop, and there
is no async drop, it would be nice to not have to deal with this.

So, let's just stop doing `drop_buffers_for_immutable` and observe
the performance impact in benchmarks.
2023-08-28 20:42:18 +02:00
18 changed files with 151 additions and 188 deletions

View File

@@ -145,7 +145,11 @@ runs:
if [ "${RERUN_FLAKY}" == "true" ]; then
mkdir -p $TEST_OUTPUT
poetry run ./scripts/flaky_tests.py "${TEST_RESULT_CONNSTR}" --days 10 --output "$TEST_OUTPUT/flaky.json"
poetry run ./scripts/flaky_tests.py "${TEST_RESULT_CONNSTR}" \
--days 7 \
--output "$TEST_OUTPUT/flaky.json" \
--pg-version "${DEFAULT_PG_VERSION}" \
--build-type "${BUILD_TYPE}"
EXTRA_PARAMS="--flaky-tests-json $TEST_OUTPUT/flaky.json $EXTRA_PARAMS"
fi

View File

@@ -50,6 +50,8 @@ RUN cd postgres && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/refint.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/xml2.control
RUN for file in /usr/local/pgsql/share/extension/pg_stat_statements--*.sql ; do echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset() TO PUBLIC;' >> $file ; done
#########################################################################################
#
# Layer "postgis-build"
@@ -211,8 +213,8 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -
FROM build-deps AS vector-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.4.4.tar.gz -O pgvector.tar.gz && \
echo "1cb70a63f8928e396474796c22a20be9f7285a8a013009deb8152445b61b72e6 pgvector.tar.gz" | sha256sum --check && \
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.0.tar.gz -O pgvector.tar.gz && \
echo "d8aa3504b215467ca528525a6de12c3f85f9891b091ce0e5864dd8a9b757f77b pgvector.tar.gz" | sha256sum --check && \
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \

View File

@@ -19,9 +19,10 @@ Also `compute_ctl` spawns two separate service threads:
- `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
last activity requests.
If the `vm-informant` binary is present at `/bin/vm-informant`, it will also be started. For VM
compute nodes, `vm-informant` communicates with the VM autoscaling system. It coordinates
downscaling and (eventually) will request immediate upscaling under resource pressure.
If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
`vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
`vm-monitor` communicates with the VM autoscaling system. It coordinates
downscaling and requests immediate upscaling under resource pressure.
Usage example:
```sh

View File

@@ -20,9 +20,10 @@
//! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
//! last activity requests.
//!
//! If the `vm-informant` binary is present at `/bin/vm-informant`, it will also be started. For VM
//! compute nodes, `vm-informant` communicates with the VM autoscaling system. It coordinates
//! downscaling and (eventually) will request immediate upscaling under resource pressure.
//! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
//! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
//! `vm-monitor` communicates with the VM autoscaling system. It coordinates
//! downscaling and requests immediate upscaling under resource pressure.
//!
//! Usage example:
//! ```sh

View File

@@ -16,3 +16,19 @@ in the `neon-postgres` cgroup and set its `memory.{max,high}`.
* See also: [`neondatabase/vm-monitor`](https://github.com/neondatabase/vm-monitor/),
where initial development of the monitor happened. The repository is no longer
maintained but the commit history may be useful for debugging.
## Structure
The `vm-monitor` is loosely comprised of a few systems. These are:
* the server: this is just a simple `axum` server that accepts requests and
upgrades them to websocket connections. The server only allows one connection at
a time. This means that upon receiving a new connection, the server will terminate
and old one if it exists.
* the filecache: a struct that allows communication with the Postgres file cache.
On startup, we connect to the filecache and hold on to the connection for the
entire monitor lifetime.
* the cgroup watcher: the `CgroupWatcher` manages the `neon-postgres` cgroup by
listening for `memory.high` events and setting its `memory.{high,max}` values.
* the runner: the runner marries the filecache and cgroup watcher together,
communicating with the agent throught the `Dispatcher`, and then calling filecache
and cgroup watcher functions as needed to upscale and downscale

View File

@@ -1,7 +1,7 @@
//! Managing the websocket connection and other signals in the monitor.
//!
//! Contains types that manage the interaction (not data interchange, see `protocol`)
//! between informant and monitor, allowing us to to process and send messages in a
//! between agent and monitor, allowing us to to process and send messages in a
//! straightforward way. The dispatcher also manages that signals that come from
//! the cgroup (requesting upscale), and the signals that go to the cgroup
//! (notifying it of upscale).
@@ -24,16 +24,16 @@ use crate::protocol::{
/// The central handler for all communications in the monitor.
///
/// The dispatcher has two purposes:
/// 1. Manage the connection to the informant, sending and receiving messages.
/// 1. Manage the connection to the agent, sending and receiving messages.
/// 2. Communicate with the cgroup manager, notifying it when upscale is received,
/// and sending a message to the informant when the cgroup manager requests
/// and sending a message to the agent when the cgroup manager requests
/// upscale.
#[derive(Debug)]
pub struct Dispatcher {
/// We read informant messages of of `source`
/// We read agent messages of of `source`
pub(crate) source: SplitStream<WebSocket>,
/// We send messages to the informant through `sink`
/// We send messages to the agent through `sink`
sink: SplitSink<WebSocket, Message>,
/// Used to notify the cgroup when we are upscaled.
@@ -43,7 +43,7 @@ pub struct Dispatcher {
/// we send an `UpscaleRequst` to the agent.
pub(crate) request_upscale_events: mpsc::Receiver<()>,
/// The protocol version we have agreed to use with the informant. This is negotiated
/// The protocol version we have agreed to use with the agent. This is negotiated
/// during the creation of the dispatcher, and should be the highest shared protocol
/// version.
///
@@ -56,9 +56,9 @@ pub struct Dispatcher {
impl Dispatcher {
/// Creates a new dispatcher using the passed-in connection.
///
/// Performs a negotiation with the informant to determine the highest protocol
/// Performs a negotiation with the agent to determine the highest protocol
/// version that both support. This consists of two steps:
/// 1. Wait for the informant to sent the range of protocols it supports.
/// 1. Wait for the agent to sent the range of protocols it supports.
/// 2. Send a protocol version that works for us as well, or an error if there
/// is no compatible version.
pub async fn new(
@@ -69,7 +69,7 @@ impl Dispatcher {
let (mut sink, mut source) = stream.split();
// Figure out the highest protocol version we both support
info!("waiting for informant to send protocol version range");
info!("waiting for agent to send protocol version range");
let Some(message) = source.next().await else {
bail!("websocket connection closed while performing protocol handshake")
};
@@ -79,7 +79,7 @@ impl Dispatcher {
let Message::Text(message_text) = message else {
// All messages should be in text form, since we don't do any
// pinging/ponging. See nhooyr/websocket's implementation and the
// informant/agent for more info
// agent for more info
bail!("received non-text message during proocol handshake: {message:?}")
};
@@ -88,32 +88,30 @@ impl Dispatcher {
max: PROTOCOL_MAX_VERSION,
};
let informant_range: ProtocolRange = serde_json::from_str(&message_text)
let agent_range: ProtocolRange = serde_json::from_str(&message_text)
.context("failed to deserialize protocol version range")?;
info!(range = ?informant_range, "received protocol version range");
info!(range = ?agent_range, "received protocol version range");
let highest_shared_version = match monitor_range.highest_shared_version(&informant_range) {
let highest_shared_version = match monitor_range.highest_shared_version(&agent_range) {
Ok(version) => {
sink.send(Message::Text(
serde_json::to_string(&ProtocolResponse::Version(version)).unwrap(),
))
.await
.context("failed to notify informant of negotiated protocol version")?;
.context("failed to notify agent of negotiated protocol version")?;
version
}
Err(e) => {
sink.send(Message::Text(
serde_json::to_string(&ProtocolResponse::Error(format!(
"Received protocol version range {} which does not overlap with {}",
informant_range, monitor_range
agent_range, monitor_range
)))
.unwrap(),
))
.await
.context(
"failed to notify informant of no overlap between protocol version ranges",
)?;
.context("failed to notify agent of no overlap between protocol version ranges")?;
Err(e).context("error determining suitable protocol version range")?
}
};
@@ -137,7 +135,7 @@ impl Dispatcher {
.context("failed to send resources and oneshot sender across channel")
}
/// Send a message to the informant.
/// Send a message to the agent.
///
/// Although this function is small, it has one major benefit: it is the only
/// way to send data accross the connection, and you can only pass in a proper

View File

@@ -146,7 +146,7 @@ pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Res
/// Handles incoming websocket connections.
///
/// If we are already to connected to an informant, we kill that old connection
/// If we are already to connected to an agent, we kill that old connection
/// and accept the new one.
#[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
pub async fn ws_handler(
@@ -196,7 +196,7 @@ async fn start_monitor(
return;
}
};
info!("connected to informant");
info!("connected to agent");
match monitor.run().await {
Ok(()) => info!("monitor was killed due to new connection"),

View File

@@ -1,13 +1,13 @@
//! Types representing protocols and actual informant-monitor messages.
//! Types representing protocols and actual agent-monitor messages.
//!
//! The pervasive use of serde modifiers throughout this module is to ease
//! serialization on the go side. Because go does not have enums (which model
//! messages well), it is harder to model messages, and we accomodate that with
//! serde.
//!
//! *Note*: the informant sends and receives messages in different ways.
//! *Note*: the agent sends and receives messages in different ways.
//!
//! The informant serializes messages in the form and then sends them. The use
//! The agent serializes messages in the form and then sends them. The use
//! of `#[serde(tag = "type", content = "content")]` allows us to use `Type`
//! to determine how to deserialize `Content`.
//! ```ignore
@@ -25,9 +25,9 @@
//! Id uint64
//! }
//! ```
//! After reading the type field, the informant will decode the entire message
//! After reading the type field, the agent will decode the entire message
//! again, this time into the correct type using the embedded fields.
//! Because the informant cannot just extract the json contained in a certain field
//! Because the agent cannot just extract the json contained in a certain field
//! (it initially deserializes to `map[string]interface{}`), we keep the fields
//! at the top level, so the entire piece of json can be deserialized into a struct,
//! such as a `DownscaleResult`, with the `Type` and `Id` fields ignored.
@@ -37,7 +37,7 @@ use std::cmp;
use serde::{de::Error, Deserialize, Serialize};
/// A Message we send to the informant.
/// A Message we send to the agent.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct OutboundMsg {
#[serde(flatten)]
@@ -51,31 +51,31 @@ impl OutboundMsg {
}
}
/// The different underlying message types we can send to the informant.
/// The different underlying message types we can send to the agent.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type")]
pub enum OutboundMsgKind {
/// Indicates that the informant sent an invalid message, i.e, we couldn't
/// Indicates that the agent sent an invalid message, i.e, we couldn't
/// properly deserialize it.
InvalidMessage { error: String },
/// Indicates that we experienced an internal error while processing a message.
/// For example, if a cgroup operation fails while trying to handle an upscale,
/// we return `InternalError`.
InternalError { error: String },
/// Returned to the informant once we have finished handling an upscale. If the
/// Returned to the agent once we have finished handling an upscale. If the
/// handling was unsuccessful, an `InternalError` will get returned instead.
/// *Note*: this is a struct variant because of the way go serializes struct{}
UpscaleConfirmation {},
/// Indicates to the monitor that we are urgently requesting resources.
/// *Note*: this is a struct variant because of the way go serializes struct{}
UpscaleRequest {},
/// Returned to the informant once we have finished attempting to downscale. If
/// Returned to the agent once we have finished attempting to downscale. If
/// an error occured trying to do so, an `InternalError` will get returned instead.
/// However, if we are simply unsuccessful (for example, do to needing the resources),
/// that gets included in the `DownscaleResult`.
DownscaleResult {
// FIXME for the future (once the informant is deprecated)
// As of the time of writing, the informant/agent version of this struct is
// As of the time of writing, the agent/informant version of this struct is
// called api.DownscaleResult. This struct has uppercase fields which are
// serialized as such. Thus, we serialize using uppercase names so we don't
// have to make a breaking change to the agent<->informant protocol. Once
@@ -88,12 +88,12 @@ pub enum OutboundMsgKind {
status: String,
},
/// Part of the bidirectional heartbeat. The heartbeat is initiated by the
/// informant.
/// agent.
/// *Note*: this is a struct variant because of the way go serializes struct{}
HealthCheck {},
}
/// A message received form the informant.
/// A message received form the agent.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct InboundMsg {
#[serde(flatten)]
@@ -101,7 +101,7 @@ pub struct InboundMsg {
pub(crate) id: usize,
}
/// The different underlying message types we can receive from the informant.
/// The different underlying message types we can receive from the agent.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type", content = "content")]
pub enum InboundMsgKind {
@@ -120,14 +120,14 @@ pub enum InboundMsgKind {
/// when done.
DownscaleRequest { target: Resources },
/// Part of the bidirectional heartbeat. The heartbeat is initiated by the
/// informant.
/// agent.
/// *Note*: this is a struct variant because of the way go serializes struct{}
HealthCheck {},
}
/// Represents the resources granted to a VM.
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
// Renamed because the agent/informant has multiple resources types:
// Renamed because the agent has multiple resources types:
// `Resources` (milliCPU/memory slots)
// `Allocation` (vCPU/bytes) <- what we correspond to
#[serde(rename(serialize = "Allocation", deserialize = "Allocation"))]
@@ -151,7 +151,7 @@ pub const PROTOCOL_MAX_VERSION: ProtocolVersion = ProtocolVersion::V1_0;
pub struct ProtocolVersion(u8);
impl ProtocolVersion {
/// Represents v1.0 of the informant<-> monitor protocol - the initial version
/// Represents v1.0 of the agent<-> monitor protocol - the initial version
///
/// Currently the latest version.
const V1_0: ProtocolVersion = ProtocolVersion(1);

View File

@@ -1,4 +1,4 @@
//! Exposes the `Runner`, which handles messages received from informant and
//! Exposes the `Runner`, which handles messages received from agent and
//! sends upscale requests.
//!
//! This is the "Monitor" part of the monitor binary and is the main entrypoint for
@@ -21,8 +21,8 @@ use crate::filecache::{FileCacheConfig, FileCacheState};
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
/// Central struct that interacts with informant, dispatcher, and cgroup to handle
/// signals from the informant.
/// Central struct that interacts with agent, dispatcher, and cgroup to handle
/// signals from the agent.
#[derive(Debug)]
pub struct Runner {
config: Config,
@@ -371,7 +371,7 @@ impl Runner {
Ok(None)
}
InboundMsgKind::InternalError { error } => {
warn!(error, id, "informant experienced an internal error");
warn!(error, id, "agent experienced an internal error");
Ok(None)
}
InboundMsgKind::HealthCheck {} => {
@@ -405,7 +405,7 @@ impl Runner {
.await
.context("failed to send message")?;
}
// there is a message from the informant
// there is a message from the agent
msg = self.dispatcher.source.next() => {
if let Some(msg) = msg {
// Don't use 'message' as a key as the string also uses
@@ -422,7 +422,7 @@ impl Runner {
// Don't use 'message' as a key as the
// string also uses that for its key
msg = ?other,
"informant should only send text messages but received different type"
"agent should only send text messages but received different type"
);
continue
},

View File

@@ -99,7 +99,6 @@ async fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
let file = FileBlockReader::new(VirtualFile::open(path)?);
let summary_blk = file.read_blk(0)?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
drop(summary_blk); // so we don't borrow `file` for too long
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
actual_summary.index_start_blk,
actual_summary.index_root_blk,

View File

@@ -77,7 +77,7 @@ use std::{
convert::TryInto,
sync::{
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError, Arc, TryLockResult,
RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError,
},
};
@@ -203,9 +203,12 @@ impl Slot {
Err(usage_count) => usage_count,
}
}
}
type ImmutablePageMap = HashMap<FileId, Arc<RwLock<HashMap<u32, usize>>>>;
/// Sets the usage count to a specific value.
fn set_usage_count(&self, count: u8) {
self.usage_count.store(count, Ordering::Relaxed);
}
}
pub struct PageCache {
/// This contains the mapping from the cache key to buffer slot that currently
@@ -219,7 +222,7 @@ pub struct PageCache {
/// can have a separate mapping map, next to this field.
materialized_page_map: RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
immutable_page_map: RwLock<ImmutablePageMap>,
immutable_page_map: RwLock<HashMap<(FileId, u32), usize>>,
/// The actual buffers with their metadata.
slots: Box<[Slot]>,
@@ -265,6 +268,7 @@ pub struct PageWriteGuard<'i> {
inner: RwLockWriteGuard<'i, SlotInner>,
// Are the page contents currently valid?
// Used to mark pages as invalid that are assigned but not yet filled with data.
valid: bool,
}
@@ -427,60 +431,6 @@ impl PageCache {
self.lock_for_read(&mut cache_key)
}
/// Immediately drop all buffers belonging to given file
pub fn drop_buffers_for_immutable(&self, drop_file_id: FileId) {
let map = self.immutable_page_map.read().unwrap();
let Some(block_nos_arc_rwl) = map.get(&drop_file_id).map(Arc::clone) else {
return
};
drop(map); // avoid contention on immutable_page_map
let mut block_nos_guard = block_nos_arc_rwl.write().unwrap();
block_nos_guard.retain(|block_no, slot_idx| {
let expect_cache_key = CacheKey::ImmutableFilePage {
file_id: drop_file_id,
blkno: *block_no,
};
let slot = &self.slots[*slot_idx];
match slot.inner.try_write() {
TryLockResult::Ok(mut inner) => {
// check again, could have been taken since added to immutable_page_map
if inner.key == Some(expect_cache_key) {
// immutable_page_map was still in sync with reality
// TODO: find a way to share code with `remove_mapping`
self.size_metrics.current_bytes_immutable.sub_page_sz(1);
inner.key = None;
}
false
}
TryLockResult::Err(e @ TryLockError::Poisoned(_)) => {
panic!("slot lock {slot_idx} poisoned: {e:?}")
}
TryLockResult::Err(TryLockError::WouldBlock) => {
// This function is only called from the `Drop` impl of EphemeralFile.
// So, there shouldn't be any page cache users that are reading from
// that EphemeralFile anymore, because,
// 1. EphemeralFile doesn't hand out page cache read guards that outlive EphemeralFile and
// 2. there can't be any concurrent `EphemeralFile::reads`s because it's being dropped.
//
// So, the only reason why the page is locked right now is `find_victim`
// trying to claim this page.
// So, let `find_victim` do the work.
// Also, it will call `remove_mapping()` to remove the (file_id, block_no)
// from the immutable_page_map; so, keep the entry in the map here, otherwise
// the `remove_mapping()` call will panic.
true
}
}
});
if block_nos_guard.is_empty() {
drop(block_nos_guard); // remove_mapping_immutable_file_common re-aquires the lock
drop(block_nos_arc_rwl);
let mut map = self.immutable_page_map.write().unwrap();
self.remove_mapping_immutable_file_common(&mut map, drop_file_id)
}
}
//
// Section 2: Internal interface functions for lookup/update.
//
@@ -591,7 +541,7 @@ impl PageCache {
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.usage_count.store(1, Ordering::Relaxed);
slot.set_usage_count(1);
return Ok(ReadBufResult::NotFound(PageWriteGuard {
inner,
@@ -652,7 +602,7 @@ impl PageCache {
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.usage_count.store(1, Ordering::Relaxed);
slot.set_usage_count(1);
return Ok(WriteBufResult::NotFound(PageWriteGuard {
inner,
@@ -692,9 +642,7 @@ impl PageCache {
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
let block_nos = map.get(file_id)?;
let block_nos = block_nos.read().unwrap();
block_nos.get(blkno).copied()
Some(*map.get(&(*file_id, *blkno))?)
}
}
}
@@ -717,9 +665,7 @@ impl PageCache {
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
let block_nos = map.get(file_id)?;
let block_nos = block_nos.read().unwrap();
block_nos.get(blkno).copied()
Some(*map.get(&(*file_id, *blkno))?)
}
}
}
@@ -751,39 +697,14 @@ impl PageCache {
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
let block_nos = map.get(file_id).expect("could not find file_id in mapping");
let mut block_nos = block_nos.write().unwrap();
block_nos
.remove(blkno)
.expect("could not find blkno in mapping");
let mut map = self.immutable_page_map.write().unwrap();
map.remove(&(*file_id, *blkno))
.expect("could not find old key in mapping");
self.size_metrics.current_bytes_immutable.sub_page_sz(1);
if block_nos.is_empty() {
drop(block_nos);
// re-lock map in write mode
drop(map);
let mut map = self.immutable_page_map.write().unwrap();
self.remove_mapping_immutable_file_common(&mut map, *file_id)
}
}
}
}
fn remove_mapping_immutable_file_common(
&self,
map: &mut RwLockWriteGuard<ImmutablePageMap>,
file_id: FileId,
) {
let Some(block_nos_rwl) = map.get(&file_id) else {
return;
};
let block_nos = block_nos_rwl.read().unwrap();
if block_nos.is_empty() {
drop(block_nos);
map.remove(&file_id);
}
}
///
/// Insert mapping for given key.
///
@@ -817,9 +738,7 @@ impl PageCache {
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
let block_nos = map.entry(*file_id).or_default();
let mut block_nos = block_nos.write().unwrap();
match block_nos.entry(*blkno) {
match map.entry((*file_id, *blkno)) {
Entry::Occupied(entry) => Some(*entry.get()),
Entry::Vacant(entry) => {
entry.insert(slot_idx);
@@ -882,6 +801,8 @@ impl PageCache {
fn new(num_pages: usize) -> Self {
assert!(num_pages > 0, "page cache size must be > 0");
// We use Box::leak here and into_boxed_slice to avoid leaking uninitialized
// memory that Vec's might contain.
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;

View File

@@ -36,9 +36,7 @@ where
/// Reference to an in-memory copy of an immutable on-disk block.
pub enum BlockLease<'a> {
/// [crate::page_cache::PageCache::drop_buffers_for_immutable] relies on the read guard
/// not outiving the EphemeralFile. See the comment in there for details.
PageReadGuard(PageReadGuard<'a>),
PageReadGuard(PageReadGuard<'static>),
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
#[cfg(test)]
Rc(std::rc::Rc<[u8; PAGE_SZ]>),

View File

@@ -221,9 +221,8 @@ pub fn is_ephemeral_file(filename: &str) -> bool {
impl Drop for EphemeralFile {
fn drop(&mut self) {
// drop all pages from page cache
let cache = page_cache::get();
cache.drop_buffers_for_immutable(self.page_cache_file_id);
// There might still be pages in the [`crate::page_cache`] for this file.
// We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
// unlink the file
let res = std::fs::remove_file(&self.file.path);

View File

@@ -848,7 +848,6 @@ impl DeltaLayerInner {
let summary_blk = file.read_blk(0)?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
drop(summary_blk); // so we don't borrow `file` for too long
if let Some(mut expected_summary) = summary {
// production code path

View File

@@ -442,7 +442,6 @@ impl ImageLayerInner {
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0)?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
drop(summary_blk); // so we don't borrow `file` for too long
if let Some(mut expected_summary) = summary {
// production code path

View File

@@ -12,25 +12,26 @@ import psycopg2.extras
# We call the test "flaky" if it failed at least once on the main branch in the last N=10 days.
FLAKY_TESTS_QUERY = """
SELECT
DISTINCT parent_suite, suite, test
DISTINCT parent_suite, suite, REGEXP_REPLACE(test, '(release|debug)-pg(\\d+)-?', '') as deparametrized_test
FROM
(
SELECT
revision,
jsonb_array_elements(data -> 'children') -> 'name' as parent_suite,
jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'name' as suite,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'name' as test,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'status' as status,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'retriesStatusChange' as retries_status_change,
to_timestamp((jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'time' -> 'start')::bigint / 1000)::date as timestamp
reference,
jsonb_array_elements(data -> 'children') ->> 'name' as parent_suite,
jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') ->> 'name' as suite,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') ->> 'name' as test,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') ->> 'status' as status,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') ->> 'retriesStatusChange' as retries_status_change,
to_timestamp((jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'time' ->> 'start')::bigint / 1000)::date as timestamp
FROM
regress_test_results
WHERE
reference = 'refs/heads/main'
) data
WHERE
timestamp > CURRENT_DATE - INTERVAL '%s' day
AND (status::text IN ('"failed"', '"broken"') OR retries_status_change::boolean)
AND (
(status IN ('failed', 'broken') AND reference = 'refs/heads/main')
OR retries_status_change::boolean
)
;
"""
@@ -40,6 +41,9 @@ def main(args: argparse.Namespace):
interval_days = args.days
output = args.output
build_type = args.build_type
pg_version = args.pg_version
res: DefaultDict[str, DefaultDict[str, Dict[str, bool]]]
res = defaultdict(lambda: defaultdict(dict))
@@ -55,8 +59,21 @@ def main(args: argparse.Namespace):
rows = []
for row in rows:
logging.info(f"\t{row['parent_suite'].replace('.', '/')}/{row['suite']}.py::{row['test']}")
res[row["parent_suite"]][row["suite"]][row["test"]] = True
# We don't want to automatically rerun tests in a performance suite
if row["parent_suite"] != "test_runner.regress":
continue
deparametrized_test = row["deparametrized_test"]
dash_if_needed = "" if deparametrized_test.endswith("[]") else "-"
parametrized_test = deparametrized_test.replace(
"[",
f"[{build_type}-pg{pg_version}{dash_if_needed}",
)
res[row["parent_suite"]][row["suite"]][parametrized_test] = True
logging.info(
f"\t{row['parent_suite'].replace('.', '/')}/{row['suite']}.py::{parametrized_test}"
)
logging.info(f"saving results to {output.name}")
json.dump(res, output, indent=2)
@@ -77,6 +94,18 @@ if __name__ == "__main__":
type=int,
help="how many days to look back for flaky tests (default: 10)",
)
parser.add_argument(
"--build-type",
required=True,
type=str,
help="for which build type to create list of flaky tests (debug or release)",
)
parser.add_argument(
"--pg-version",
required=True,
type=int,
help="for which Postgres version to create list of flaky tests (14, 15, etc.)",
)
parser.add_argument(
"connstr",
help="connection string to the test results database",

View File

@@ -233,10 +233,19 @@ if TYPE_CHECKING:
def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None):
response = list_prefix(neon_env_builder, prefix)
objects = response.get("Contents")
assert (
response["KeyCount"] == 0
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
keys = response["KeyCount"]
objects = response.get("Contents", [])
if keys != 0 and len(objects) == 0:
# this has been seen in one case with mock_s3:
# https://neon-github-public-dev.s3.amazonaws.com/reports/pr-4938/6000769714/index.html#suites/3556ed71f2d69272a7014df6dcb02317/ca01e4f4d8d9a11f
# looking at moto impl, it might be there's a race with common prefix (sub directory) not going away with deletes
common_prefixes = response.get("CommonPrefixes", [])
log.warn(
f"contradicting ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}"
)
assert keys == 0, f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
def assert_prefix_not_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None):

View File

@@ -1,12 +0,0 @@
# Configuration for cgroups in VM compute nodes
group neon-postgres {
perm {
admin {
uid = vm-informant;
}
task {
gid = users;
}
}
memory {}
}