Compare commits

..

9 Commits

Author SHA1 Message Date
Alex Chi Z
da8fa73b1b allow anyone reset pg_stat_statements
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-08-29 16:16:18 +02:00
Joonas Koivunen
d1fcdf75b3 test: enhanced logging for curious mock_s3 (#5134)
Possible flakyness with mock_s3. Add logging in hopes this will happen
again.

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2023-08-29 14:48:50 +03:00
Alexander Bayandin
7e39a96441 scripts/flaky_tests.py: Improve flaky tests detection (#5094)
## Problem

We still need to rerun some builds manually because flaky tests weren't
detected automatically.
I found two reasons for it:
- If a test is flaky on a particular build type, on a particular
Postgres version, there's a high chance that this test is flaky on all
configurations, but we don't automatically detect such cases.
- We detect flaky tests only on the main branch, which requires manual
retrigger runs for freshly made flaky tests.
Both of them are fixed in the PR.

## Summary of changes
- Spread flakiness of a single test to all configurations
- Detect flaky tests in all branches (not only in the main)
- Look back only at  7 days of test history (instead of 10)
2023-08-29 11:53:24 +01:00
Vadim Kharitonov
babefdd3f9 Upgrade pgvector to 0.5.0 (#5132) 2023-08-29 12:53:50 +03:00
Arpad Müller
805fee1483 page cache: small code cleanups (#5125)
## Problem

I saw these things while working on #5111.

## Summary of changes

* Add a comment explaining why we use `Vec::leak` instead of
`Vec::into_boxed_slice` plus `Box::leak`.
* Add another comment explaining what `valid` is doing, it wasn't very
clear before.
* Add a function `set_usage_count` to not set it directly.
2023-08-29 11:49:04 +03:00
Felix Prasanna
85d6d9dc85 monitor/compute_ctl: remove references to the informant (#5115)
Also added some docs to the monitor :)

Co-authored-by: Em Sharnoff <sharnoff@neon.tech>
2023-08-29 02:59:27 +03:00
Em Sharnoff
e40ee7c3d1 remove unused file 'vm-cgconfig.conf' (#5127)
Honestly no clue why it's still here, should have been removed ages ago.
This is handled by vm-builder now.
2023-08-28 13:04:57 -07:00
Christian Schwarz
0fe3b3646a page cache: don't proactively evict EphemeralFile pages (#5129)
Before this patch, when dropping an EphemeralFile, we'd scan the entire
`slots` to proactively evict its pages (`drop_buffers_for_immutable`).

This was _necessary_ before #4994 because the page cache was a
write-back cache: we'd be deleting the EphemeralFile from disk after,
so, if we hadn't evicted its pages before that, write-back in
`find_victim` wouldhave failed.

But, since #4994, the page cache is a read-only cache, so, it's safe
to keep read-only data cached. It's never going to get accessed again
and eventually, `find_victim` will evict it.

The only remaining advantage of `drop_buffers_for_immutable` over
relying on `find_victim` is that `find_victim` has to do the clock
page replacement iterations until the count reaches 0,
whereas `drop_buffers_for_immutable` can kick the page out right away.

However, weigh that against the cost of `drop_buffers_for_immutable`,
which currently scans the entire `slots` array to find the
EphemeralFile's pages.

Alternatives have been proposed in #5122 and #5128, but, they come
with their own overheads & trade-offs.

Also, the real reason why we're looking into this piece of code is
that we want to make the slots rwlock async in #5023.
Since `drop_buffers_for_immutable` is called from drop, and there
is no async drop, it would be nice to not have to deal with this.

So, let's just stop doing `drop_buffers_for_immutable` and observe
the performance impact in benchmarks.
2023-08-28 20:42:18 +02:00
Em Sharnoff
529f8b5016 compute_ctl: Fix switched vm-monitor args (#5117)
Small switcheroo from #4946.
2023-08-28 14:55:41 +02:00
15 changed files with 287 additions and 182 deletions

View File

@@ -145,7 +145,11 @@ runs:
if [ "${RERUN_FLAKY}" == "true" ]; then
mkdir -p $TEST_OUTPUT
poetry run ./scripts/flaky_tests.py "${TEST_RESULT_CONNSTR}" --days 10 --output "$TEST_OUTPUT/flaky.json"
poetry run ./scripts/flaky_tests.py "${TEST_RESULT_CONNSTR}" \
--days 7 \
--output "$TEST_OUTPUT/flaky.json" \
--pg-version "${DEFAULT_PG_VERSION}" \
--build-type "${BUILD_TYPE}"
EXTRA_PARAMS="--flaky-tests-json $TEST_OUTPUT/flaky.json $EXTRA_PARAMS"
fi

View File

@@ -50,6 +50,8 @@ RUN cd postgres && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/refint.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/xml2.control
RUN for file in /usr/local/pgsql/share/extension/pg_stat_statements--*.sql ; do echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset() TO PUBLIC;' >> $file ; done
#########################################################################################
#
# Layer "postgis-build"
@@ -211,8 +213,8 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -
FROM build-deps AS vector-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.4.4.tar.gz -O pgvector.tar.gz && \
echo "1cb70a63f8928e396474796c22a20be9f7285a8a013009deb8152445b61b72e6 pgvector.tar.gz" | sha256sum --check && \
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.0.tar.gz -O pgvector.tar.gz && \
echo "d8aa3504b215467ca528525a6de12c3f85f9891b091ce0e5864dd8a9b757f77b pgvector.tar.gz" | sha256sum --check && \
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \

View File

@@ -19,9 +19,10 @@ Also `compute_ctl` spawns two separate service threads:
- `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
last activity requests.
If the `vm-informant` binary is present at `/bin/vm-informant`, it will also be started. For VM
compute nodes, `vm-informant` communicates with the VM autoscaling system. It coordinates
downscaling and (eventually) will request immediate upscaling under resource pressure.
If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
`vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
`vm-monitor` communicates with the VM autoscaling system. It coordinates
downscaling and requests immediate upscaling under resource pressure.
Usage example:
```sh

View File

@@ -20,9 +20,10 @@
//! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
//! last activity requests.
//!
//! If the `vm-informant` binary is present at `/bin/vm-informant`, it will also be started. For VM
//! compute nodes, `vm-informant` communicates with the VM autoscaling system. It coordinates
//! downscaling and (eventually) will request immediate upscaling under resource pressure.
//! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
//! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
//! `vm-monitor` communicates with the VM autoscaling system. It coordinates
//! downscaling and requests immediate upscaling under resource pressure.
//!
//! Usage example:
//! ```sh
@@ -278,8 +279,8 @@ fn main() -> Result<()> {
use tokio_util::sync::CancellationToken;
use tracing::warn;
let vm_monitor_addr = matches.get_one::<String>("vm-monitor-addr");
let cgroup = matches.get_one::<String>("filecache-connstr");
let file_cache_connstr = matches.get_one::<String>("cgroup");
let file_cache_connstr = matches.get_one::<String>("filecache-connstr");
let cgroup = matches.get_one::<String>("cgroup");
// Only make a runtime if we need to.
// Note: it seems like you can make a runtime in an inner scope and

View File

@@ -16,3 +16,19 @@ in the `neon-postgres` cgroup and set its `memory.{max,high}`.
* See also: [`neondatabase/vm-monitor`](https://github.com/neondatabase/vm-monitor/),
where initial development of the monitor happened. The repository is no longer
maintained but the commit history may be useful for debugging.
## Structure
The `vm-monitor` is loosely comprised of a few systems. These are:
* the server: this is just a simple `axum` server that accepts requests and
upgrades them to websocket connections. The server only allows one connection at
a time. This means that upon receiving a new connection, the server will terminate
and old one if it exists.
* the filecache: a struct that allows communication with the Postgres file cache.
On startup, we connect to the filecache and hold on to the connection for the
entire monitor lifetime.
* the cgroup watcher: the `CgroupWatcher` manages the `neon-postgres` cgroup by
listening for `memory.high` events and setting its `memory.{high,max}` values.
* the runner: the runner marries the filecache and cgroup watcher together,
communicating with the agent throught the `Dispatcher`, and then calling filecache
and cgroup watcher functions as needed to upscale and downscale

View File

@@ -1,7 +1,7 @@
//! Managing the websocket connection and other signals in the monitor.
//!
//! Contains types that manage the interaction (not data interchange, see `protocol`)
//! between informant and monitor, allowing us to to process and send messages in a
//! between agent and monitor, allowing us to to process and send messages in a
//! straightforward way. The dispatcher also manages that signals that come from
//! the cgroup (requesting upscale), and the signals that go to the cgroup
//! (notifying it of upscale).
@@ -24,16 +24,16 @@ use crate::protocol::{
/// The central handler for all communications in the monitor.
///
/// The dispatcher has two purposes:
/// 1. Manage the connection to the informant, sending and receiving messages.
/// 1. Manage the connection to the agent, sending and receiving messages.
/// 2. Communicate with the cgroup manager, notifying it when upscale is received,
/// and sending a message to the informant when the cgroup manager requests
/// and sending a message to the agent when the cgroup manager requests
/// upscale.
#[derive(Debug)]
pub struct Dispatcher {
/// We read informant messages of of `source`
/// We read agent messages of of `source`
pub(crate) source: SplitStream<WebSocket>,
/// We send messages to the informant through `sink`
/// We send messages to the agent through `sink`
sink: SplitSink<WebSocket, Message>,
/// Used to notify the cgroup when we are upscaled.
@@ -43,7 +43,7 @@ pub struct Dispatcher {
/// we send an `UpscaleRequst` to the agent.
pub(crate) request_upscale_events: mpsc::Receiver<()>,
/// The protocol version we have agreed to use with the informant. This is negotiated
/// The protocol version we have agreed to use with the agent. This is negotiated
/// during the creation of the dispatcher, and should be the highest shared protocol
/// version.
///
@@ -56,9 +56,9 @@ pub struct Dispatcher {
impl Dispatcher {
/// Creates a new dispatcher using the passed-in connection.
///
/// Performs a negotiation with the informant to determine the highest protocol
/// Performs a negotiation with the agent to determine the highest protocol
/// version that both support. This consists of two steps:
/// 1. Wait for the informant to sent the range of protocols it supports.
/// 1. Wait for the agent to sent the range of protocols it supports.
/// 2. Send a protocol version that works for us as well, or an error if there
/// is no compatible version.
pub async fn new(
@@ -69,7 +69,7 @@ impl Dispatcher {
let (mut sink, mut source) = stream.split();
// Figure out the highest protocol version we both support
info!("waiting for informant to send protocol version range");
info!("waiting for agent to send protocol version range");
let Some(message) = source.next().await else {
bail!("websocket connection closed while performing protocol handshake")
};
@@ -79,7 +79,7 @@ impl Dispatcher {
let Message::Text(message_text) = message else {
// All messages should be in text form, since we don't do any
// pinging/ponging. See nhooyr/websocket's implementation and the
// informant/agent for more info
// agent for more info
bail!("received non-text message during proocol handshake: {message:?}")
};
@@ -88,32 +88,30 @@ impl Dispatcher {
max: PROTOCOL_MAX_VERSION,
};
let informant_range: ProtocolRange = serde_json::from_str(&message_text)
let agent_range: ProtocolRange = serde_json::from_str(&message_text)
.context("failed to deserialize protocol version range")?;
info!(range = ?informant_range, "received protocol version range");
info!(range = ?agent_range, "received protocol version range");
let highest_shared_version = match monitor_range.highest_shared_version(&informant_range) {
let highest_shared_version = match monitor_range.highest_shared_version(&agent_range) {
Ok(version) => {
sink.send(Message::Text(
serde_json::to_string(&ProtocolResponse::Version(version)).unwrap(),
))
.await
.context("failed to notify informant of negotiated protocol version")?;
.context("failed to notify agent of negotiated protocol version")?;
version
}
Err(e) => {
sink.send(Message::Text(
serde_json::to_string(&ProtocolResponse::Error(format!(
"Received protocol version range {} which does not overlap with {}",
informant_range, monitor_range
agent_range, monitor_range
)))
.unwrap(),
))
.await
.context(
"failed to notify informant of no overlap between protocol version ranges",
)?;
.context("failed to notify agent of no overlap between protocol version ranges")?;
Err(e).context("error determining suitable protocol version range")?
}
};
@@ -137,7 +135,7 @@ impl Dispatcher {
.context("failed to send resources and oneshot sender across channel")
}
/// Send a message to the informant.
/// Send a message to the agent.
///
/// Although this function is small, it has one major benefit: it is the only
/// way to send data accross the connection, and you can only pass in a proper

View File

@@ -146,7 +146,7 @@ pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Res
/// Handles incoming websocket connections.
///
/// If we are already to connected to an informant, we kill that old connection
/// If we are already to connected to an agent, we kill that old connection
/// and accept the new one.
#[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
pub async fn ws_handler(
@@ -196,7 +196,7 @@ async fn start_monitor(
return;
}
};
info!("connected to informant");
info!("connected to agent");
match monitor.run().await {
Ok(()) => info!("monitor was killed due to new connection"),

View File

@@ -1,13 +1,13 @@
//! Types representing protocols and actual informant-monitor messages.
//! Types representing protocols and actual agent-monitor messages.
//!
//! The pervasive use of serde modifiers throughout this module is to ease
//! serialization on the go side. Because go does not have enums (which model
//! messages well), it is harder to model messages, and we accomodate that with
//! serde.
//!
//! *Note*: the informant sends and receives messages in different ways.
//! *Note*: the agent sends and receives messages in different ways.
//!
//! The informant serializes messages in the form and then sends them. The use
//! The agent serializes messages in the form and then sends them. The use
//! of `#[serde(tag = "type", content = "content")]` allows us to use `Type`
//! to determine how to deserialize `Content`.
//! ```ignore
@@ -25,9 +25,9 @@
//! Id uint64
//! }
//! ```
//! After reading the type field, the informant will decode the entire message
//! After reading the type field, the agent will decode the entire message
//! again, this time into the correct type using the embedded fields.
//! Because the informant cannot just extract the json contained in a certain field
//! Because the agent cannot just extract the json contained in a certain field
//! (it initially deserializes to `map[string]interface{}`), we keep the fields
//! at the top level, so the entire piece of json can be deserialized into a struct,
//! such as a `DownscaleResult`, with the `Type` and `Id` fields ignored.
@@ -37,7 +37,7 @@ use std::cmp;
use serde::{de::Error, Deserialize, Serialize};
/// A Message we send to the informant.
/// A Message we send to the agent.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct OutboundMsg {
#[serde(flatten)]
@@ -51,31 +51,31 @@ impl OutboundMsg {
}
}
/// The different underlying message types we can send to the informant.
/// The different underlying message types we can send to the agent.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type")]
pub enum OutboundMsgKind {
/// Indicates that the informant sent an invalid message, i.e, we couldn't
/// Indicates that the agent sent an invalid message, i.e, we couldn't
/// properly deserialize it.
InvalidMessage { error: String },
/// Indicates that we experienced an internal error while processing a message.
/// For example, if a cgroup operation fails while trying to handle an upscale,
/// we return `InternalError`.
InternalError { error: String },
/// Returned to the informant once we have finished handling an upscale. If the
/// Returned to the agent once we have finished handling an upscale. If the
/// handling was unsuccessful, an `InternalError` will get returned instead.
/// *Note*: this is a struct variant because of the way go serializes struct{}
UpscaleConfirmation {},
/// Indicates to the monitor that we are urgently requesting resources.
/// *Note*: this is a struct variant because of the way go serializes struct{}
UpscaleRequest {},
/// Returned to the informant once we have finished attempting to downscale. If
/// Returned to the agent once we have finished attempting to downscale. If
/// an error occured trying to do so, an `InternalError` will get returned instead.
/// However, if we are simply unsuccessful (for example, do to needing the resources),
/// that gets included in the `DownscaleResult`.
DownscaleResult {
// FIXME for the future (once the informant is deprecated)
// As of the time of writing, the informant/agent version of this struct is
// As of the time of writing, the agent/informant version of this struct is
// called api.DownscaleResult. This struct has uppercase fields which are
// serialized as such. Thus, we serialize using uppercase names so we don't
// have to make a breaking change to the agent<->informant protocol. Once
@@ -88,12 +88,12 @@ pub enum OutboundMsgKind {
status: String,
},
/// Part of the bidirectional heartbeat. The heartbeat is initiated by the
/// informant.
/// agent.
/// *Note*: this is a struct variant because of the way go serializes struct{}
HealthCheck {},
}
/// A message received form the informant.
/// A message received form the agent.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct InboundMsg {
#[serde(flatten)]
@@ -101,7 +101,7 @@ pub struct InboundMsg {
pub(crate) id: usize,
}
/// The different underlying message types we can receive from the informant.
/// The different underlying message types we can receive from the agent.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type", content = "content")]
pub enum InboundMsgKind {
@@ -120,14 +120,14 @@ pub enum InboundMsgKind {
/// when done.
DownscaleRequest { target: Resources },
/// Part of the bidirectional heartbeat. The heartbeat is initiated by the
/// informant.
/// agent.
/// *Note*: this is a struct variant because of the way go serializes struct{}
HealthCheck {},
}
/// Represents the resources granted to a VM.
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
// Renamed because the agent/informant has multiple resources types:
// Renamed because the agent has multiple resources types:
// `Resources` (milliCPU/memory slots)
// `Allocation` (vCPU/bytes) <- what we correspond to
#[serde(rename(serialize = "Allocation", deserialize = "Allocation"))]
@@ -151,7 +151,7 @@ pub const PROTOCOL_MAX_VERSION: ProtocolVersion = ProtocolVersion::V1_0;
pub struct ProtocolVersion(u8);
impl ProtocolVersion {
/// Represents v1.0 of the informant<-> monitor protocol - the initial version
/// Represents v1.0 of the agent<-> monitor protocol - the initial version
///
/// Currently the latest version.
const V1_0: ProtocolVersion = ProtocolVersion(1);

View File

@@ -1,4 +1,4 @@
//! Exposes the `Runner`, which handles messages received from informant and
//! Exposes the `Runner`, which handles messages received from agent and
//! sends upscale requests.
//!
//! This is the "Monitor" part of the monitor binary and is the main entrypoint for
@@ -21,8 +21,8 @@ use crate::filecache::{FileCacheConfig, FileCacheState};
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
/// Central struct that interacts with informant, dispatcher, and cgroup to handle
/// signals from the informant.
/// Central struct that interacts with agent, dispatcher, and cgroup to handle
/// signals from the agent.
#[derive(Debug)]
pub struct Runner {
config: Config,
@@ -371,7 +371,7 @@ impl Runner {
Ok(None)
}
InboundMsgKind::InternalError { error } => {
warn!(error, id, "informant experienced an internal error");
warn!(error, id, "agent experienced an internal error");
Ok(None)
}
InboundMsgKind::HealthCheck {} => {
@@ -405,7 +405,7 @@ impl Runner {
.await
.context("failed to send message")?;
}
// there is a message from the informant
// there is a message from the agent
msg = self.dispatcher.source.next() => {
if let Some(msg) = msg {
// Don't use 'message' as a key as the string also uses
@@ -422,7 +422,7 @@ impl Runner {
// Don't use 'message' as a key as the
// string also uses that for its key
msg = ?other,
"informant should only send text messages but received different type"
"agent should only send text messages but received different type"
);
continue
},

View File

@@ -39,9 +39,10 @@
//! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
//! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
//! a read guard for the page. Just use it.
//! * If the page was not cached, we invoke the supplied miss handler to fill the page,
//! then return [`ReadBufResult::MissFilled`].
//! After that, try again to [`PageCache::read_immutable_buf`].
//! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
//! a write guard for the page. Fill the page with the contents of the on-disk file.
//! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
//! Then try again to [`PageCache::read_immutable_buf`].
//! Unless there's high cache pressure, the page should now be cached.
//! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
//!
@@ -202,6 +203,11 @@ impl Slot {
Err(usage_count) => usage_count,
}
}
/// Sets the usage count to a specific value.
fn set_usage_count(&self, count: u8) {
self.usage_count.store(count, Ordering::Relaxed);
}
}
pub struct PageCache {
@@ -248,16 +254,82 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
}
}
///
/// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
/// until the guard is dropped.
///
/// Counterintuitively, this is used even for a read, if the requested page is not
/// currently found in the page cache. In that case, the caller of lock_for_read()
/// is expected to fill in the page contents and call mark_valid(). Similarly
/// lock_for_write() can return an invalid buffer that the caller is expected to
/// to initialize.
///
pub struct PageWriteGuard<'i> {
inner: RwLockWriteGuard<'i, SlotInner>,
// Are the page contents currently valid?
// Used to mark pages as invalid that are assigned but not yet filled with data.
valid: bool,
}
impl std::ops::DerefMut for PageWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.buf
}
}
impl std::ops::Deref for PageWriteGuard<'_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
self.inner.buf
}
}
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
self.inner.buf
}
}
impl PageWriteGuard<'_> {
/// Mark that the buffer contents are now valid.
pub fn mark_valid(&mut self) {
assert!(self.inner.key.is_some());
assert!(
!self.valid,
"mark_valid called on a buffer that was already valid"
);
self.valid = true;
}
}
impl Drop for PageWriteGuard<'_> {
///
/// If the buffer was allocated for a page that was not already in the
/// cache, but the lock_for_read/write() caller dropped the buffer without
/// initializing it, remove the mapping from the page cache.
///
fn drop(&mut self) {
assert!(self.inner.key.is_some());
if !self.valid {
let self_key = self.inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
self.inner.key = None;
}
}
}
/// lock_for_read() return value
pub enum ReadBufResult<'a> {
Found(PageReadGuard<'a>),
MissFilled,
NotFound(PageWriteGuard<'a>),
}
/// lock_for_write() return value
pub enum WriteBufResult {
Already,
Inserted,
pub enum WriteBufResult<'a> {
Found(PageWriteGuard<'a>),
NotFound(PageWriteGuard<'a>),
}
impl PageCache {
@@ -335,9 +407,17 @@ impl PageCache {
lsn,
};
match self.lock_for_write(&cache_key, img)? {
WriteBufResult::Already => {}
WriteBufResult::Inserted => {}
match self.lock_for_write(&cache_key)? {
WriteBufResult::Found(write_guard) => {
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(*write_guard == img);
}
WriteBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(img);
write_guard.mark_valid();
}
}
Ok(())
@@ -345,39 +425,10 @@ impl PageCache {
// Section 1.2: Public interface functions for working with immutable file pages.
pub fn read_immutable_buf<M>(
&self,
file_id: FileId,
blkno: u32,
miss_handler: M,
) -> anyhow::Result<ReadBufResult>
where
M: FnOnce(&mut [u8; PAGE_SZ]) -> anyhow::Result<()>,
{
pub fn read_immutable_buf(&self, file_id: FileId, blkno: u32) -> anyhow::Result<ReadBufResult> {
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
self.lock_for_read(&mut cache_key, miss_handler)
}
/// Immediately drop all buffers belonging to given file
pub fn drop_buffers_for_immutable(&self, drop_file_id: FileId) {
for slot_idx in 0..self.slots.len() {
let slot = &self.slots[slot_idx];
let mut inner = slot.inner.write().unwrap();
if let Some(key) = &inner.key {
match key {
CacheKey::ImmutableFilePage { file_id, blkno: _ }
if *file_id == drop_file_id =>
{
// remove mapping for old buffer
self.remove_mapping(key);
inner.key = None;
}
_ => {}
}
}
}
self.lock_for_read(&mut cache_key)
}
//
@@ -445,14 +496,7 @@ impl PageCache {
/// }
/// ```
///
fn lock_for_read<M, P>(
&self,
cache_key: &mut CacheKey,
miss_handler: M,
) -> anyhow::Result<ReadBufResult>
where
M: FnOnce(&mut [u8; PAGE_SZ]) -> anyhow::Result<P>,
{
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
@@ -494,41 +538,46 @@ impl PageCache {
continue;
}
miss_handler(&mut inner.buf)?;
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.usage_count.store(1, Ordering::Relaxed);
slot.set_usage_count(1);
return Ok(ReadBufResult::MissFilled);
return Ok(ReadBufResult::NotFound(PageWriteGuard {
inner,
valid: false,
}));
}
}
/// Look up a page in the cache and lock it in write mode. If it's not
/// found, returns None.
///
/// When locking a page for writing, the search criteria is always "exact".
fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().unwrap();
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageWriteGuard { inner, valid: true });
}
}
None
}
/// Return a write-locked buffer for given block.
///
/// Similar to lock_for_read(), but the returned buffer is write-locked and
/// may be modified by the caller even if it's already found in the cache.
fn lock_for_write<P>(&self, cache_key: &CacheKey, content: P) -> anyhow::Result<WriteBufResult>
where
P: AsRef<[u8]>,
{
fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
loop {
// First check if the key already exists in the cache.
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().unwrap();
if inner.key.as_ref() == Some(cache_key) {
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert_eq!(inner.buf, content.as_ref());
slot.inc_usage_count();
return Ok(WriteBufResult::Already);
}
if let Some(write_guard) = self.try_lock_for_write(cache_key) {
return Ok(WriteBufResult::Found(write_guard));
}
// Not found. Find a victim buffer
@@ -553,11 +602,12 @@ impl PageCache {
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.usage_count.store(1, Ordering::Relaxed);
slot.set_usage_count(1);
inner.buf.copy_from_slice(content.as_ref());
return Ok(WriteBufResult::Inserted);
return Ok(WriteBufResult::NotFound(PageWriteGuard {
inner,
valid: false,
}));
}
}
@@ -751,6 +801,8 @@ impl PageCache {
fn new(num_pages: usize) -> Self {
assert!(num_pages > 0, "page cache size must be > 0");
// We use Box::leak here and into_boxed_slice to avoid leaking uninitialized
// memory that Vec's might contain.
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;

View File

@@ -8,7 +8,7 @@ use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::fs::File;
use std::ops::Deref;
use std::ops::{Deref, DerefMut};
use std::os::unix::fs::FileExt;
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
@@ -174,11 +174,7 @@ where
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.file_id, blknum, |buf| {
// Read the page from disk into the buffer
self.fill_buffer(buf, blknum)?;
Ok(())
})
.read_immutable_buf(self.file_id, blknum)
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
@@ -186,7 +182,14 @@ where
)
})? {
ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::MissFilled => continue,
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum)?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
}
}

View File

@@ -8,6 +8,7 @@ use crate::virtual_file::VirtualFile;
use std::cmp::min;
use std::fs::OpenOptions;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
use std::os::unix::prelude::FileExt;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
@@ -66,12 +67,7 @@ impl EphemeralFile {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, |buf| {
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(buf, blknum as u64 * PAGE_SZ as u64)?;
Ok(())
})
.read_immutable_buf(self.page_cache_file_id, blknum)
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
@@ -87,7 +83,13 @@ impl EphemeralFile {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::MissFilled => {
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
@@ -136,17 +138,18 @@ impl EphemeralFile {
match cache.read_immutable_buf(
self.ephemeral_file.page_cache_file_id,
self.blknum,
|buf| {
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
Ok(())
},
) {
Ok(page_cache::ReadBufResult::Found(_guard)) => {
// This function takes &mut self, so, it shouldn't be possible to reach this point.
unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
}
Ok(page_cache::ReadBufResult::MissFilled) => {}
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
write_guard.mark_valid();
// pre-warm successful
}
Err(e) => {
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
@@ -218,9 +221,8 @@ pub fn is_ephemeral_file(filename: &str) -> bool {
impl Drop for EphemeralFile {
fn drop(&mut self) {
// drop all pages from page cache
let cache = page_cache::get();
cache.drop_buffers_for_immutable(self.page_cache_file_id);
// There might still be pages in the [`crate::page_cache`] for this file.
// We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
// unlink the file
let res = std::fs::remove_file(&self.file.path);

View File

@@ -12,25 +12,26 @@ import psycopg2.extras
# We call the test "flaky" if it failed at least once on the main branch in the last N=10 days.
FLAKY_TESTS_QUERY = """
SELECT
DISTINCT parent_suite, suite, test
DISTINCT parent_suite, suite, REGEXP_REPLACE(test, '(release|debug)-pg(\\d+)-?', '') as deparametrized_test
FROM
(
SELECT
revision,
jsonb_array_elements(data -> 'children') -> 'name' as parent_suite,
jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'name' as suite,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'name' as test,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'status' as status,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'retriesStatusChange' as retries_status_change,
to_timestamp((jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'time' -> 'start')::bigint / 1000)::date as timestamp
reference,
jsonb_array_elements(data -> 'children') ->> 'name' as parent_suite,
jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') ->> 'name' as suite,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') ->> 'name' as test,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') ->> 'status' as status,
jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') ->> 'retriesStatusChange' as retries_status_change,
to_timestamp((jsonb_array_elements(jsonb_array_elements(jsonb_array_elements(data -> 'children') -> 'children') -> 'children') -> 'time' ->> 'start')::bigint / 1000)::date as timestamp
FROM
regress_test_results
WHERE
reference = 'refs/heads/main'
) data
WHERE
timestamp > CURRENT_DATE - INTERVAL '%s' day
AND (status::text IN ('"failed"', '"broken"') OR retries_status_change::boolean)
AND (
(status IN ('failed', 'broken') AND reference = 'refs/heads/main')
OR retries_status_change::boolean
)
;
"""
@@ -40,6 +41,9 @@ def main(args: argparse.Namespace):
interval_days = args.days
output = args.output
build_type = args.build_type
pg_version = args.pg_version
res: DefaultDict[str, DefaultDict[str, Dict[str, bool]]]
res = defaultdict(lambda: defaultdict(dict))
@@ -55,8 +59,21 @@ def main(args: argparse.Namespace):
rows = []
for row in rows:
logging.info(f"\t{row['parent_suite'].replace('.', '/')}/{row['suite']}.py::{row['test']}")
res[row["parent_suite"]][row["suite"]][row["test"]] = True
# We don't want to automatically rerun tests in a performance suite
if row["parent_suite"] != "test_runner.regress":
continue
deparametrized_test = row["deparametrized_test"]
dash_if_needed = "" if deparametrized_test.endswith("[]") else "-"
parametrized_test = deparametrized_test.replace(
"[",
f"[{build_type}-pg{pg_version}{dash_if_needed}",
)
res[row["parent_suite"]][row["suite"]][parametrized_test] = True
logging.info(
f"\t{row['parent_suite'].replace('.', '/')}/{row['suite']}.py::{parametrized_test}"
)
logging.info(f"saving results to {output.name}")
json.dump(res, output, indent=2)
@@ -77,6 +94,18 @@ if __name__ == "__main__":
type=int,
help="how many days to look back for flaky tests (default: 10)",
)
parser.add_argument(
"--build-type",
required=True,
type=str,
help="for which build type to create list of flaky tests (debug or release)",
)
parser.add_argument(
"--pg-version",
required=True,
type=int,
help="for which Postgres version to create list of flaky tests (14, 15, etc.)",
)
parser.add_argument(
"connstr",
help="connection string to the test results database",

View File

@@ -233,10 +233,19 @@ if TYPE_CHECKING:
def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None):
response = list_prefix(neon_env_builder, prefix)
objects = response.get("Contents")
assert (
response["KeyCount"] == 0
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
keys = response["KeyCount"]
objects = response.get("Contents", [])
if keys != 0 and len(objects) == 0:
# this has been seen in one case with mock_s3:
# https://neon-github-public-dev.s3.amazonaws.com/reports/pr-4938/6000769714/index.html#suites/3556ed71f2d69272a7014df6dcb02317/ca01e4f4d8d9a11f
# looking at moto impl, it might be there's a race with common prefix (sub directory) not going away with deletes
common_prefixes = response.get("CommonPrefixes", [])
log.warn(
f"contradicting ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}"
)
assert keys == 0, f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
def assert_prefix_not_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None):

View File

@@ -1,12 +0,0 @@
# Configuration for cgroups in VM compute nodes
group neon-postgres {
perm {
admin {
uid = vm-informant;
}
task {
gid = users;
}
}
memory {}
}