Compare commits

..

7 Commits

Author SHA1 Message Date
John Spray
422310c19b slightly more efficient 2025-02-17 22:43:36 +01:00
John Spray
77b1fd40b5 wip prototype 2025-02-17 22:37:46 +01:00
John Spray
a8f59f851d Revert "tests: broaden allow-list for #10720 workaround (#10807)"
This reverts commit ae463f366b.
2025-02-17 22:08:03 +01:00
Alexander Bayandin
27241f039c test_runner: fix neon_local usage for version mismatch tests (#10859)
## Problem

Tests with mixed versions of binaries always pick up new versions if
services are started using `neon_local`.

## Summary of changes
- Set `neon_local_binpath` along with `neon_binpath` and
`pg_distrib_dir` for tests with mixed versions
2025-02-17 20:29:14 +00:00
Heikki Linnakangas
811506aaa2 fast_import: Use rust s3 client for uploading (#10777)
This replaces the use of the awscli utility. awscli binary is massive,
it added about 200 MB to the docker image size, while the s3 client was
already a dependency so using that is essentially free, as far as binary
size is concerned.

I implemented a simple upload function that tries to keep 10 uploads
going in parallel. I believe that's the default behavior of the "aws s3
sync" command too.
2025-02-17 20:07:31 +00:00
Heikki Linnakangas
2884917bd4 compute: Allow postgres user to power off the VM also on <= v16 (#10860)
I did this for debian bookworm variant in PR #10710, but forgot to
update the "bullseye" dockerfile that is used to build older PostgreSQL
versions.
2025-02-17 19:42:57 +00:00
Tristan Partin
b34598516f Warn when PR may require regenerating cloud PG settings (#10229)
These generated Postgres settings JSON files can get out of sync causing
the control plane to reject updated to an endpoint or project's Postgres
settings.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-02-17 19:02:16 +00:00
17 changed files with 244 additions and 73 deletions

View File

@@ -0,0 +1,41 @@
name: Regenerate Postgres Settings
on:
pull_request:
types:
- opened
- synchronize
- reopened
paths:
- pgxn/neon/**.c
- vendor/postgres-v*
- vendor/revisions.json
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref }}
cancel-in-progress: true
permissions:
pull-requests: write
jobs:
regenerate-pg-settings:
runs-on: ubuntu-22.04
steps:
- name: Add comment
uses: thollander/actions-comment-pull-request@v3
with:
comment-tag: ${{ github.job }}
pr-number: ${{ github.event.number }}
message: |
If this PR added a GUC in the Postgres fork or `neon` extension,
please regenerate the Postgres settings in the `cloud` repo:
```
make NEON_WORKDIR=path/to/neon/checkout \
-C goapp/internal/shareddomain/postgres generate
```
If you're an external contributor, a Neon employee will assist in
making sure this step is done.

2
Cargo.lock generated
View File

@@ -1303,6 +1303,7 @@ dependencies = [
"aws-config",
"aws-sdk-kms",
"aws-sdk-s3",
"aws-smithy-types",
"axum",
"base64 0.13.1",
"bytes",
@@ -1351,6 +1352,7 @@ dependencies = [
"utils",
"uuid",
"vm_monitor",
"walkdir",
"workspace_hack",
"zstd",
]

View File

@@ -1695,29 +1695,6 @@ RUN if [ "$TARGETARCH" = "amd64" ]; then\
&& echo "${pgbouncer_exporter_sha256} pgbouncer_exporter" | sha256sum -c -\
&& echo "${sql_exporter_sha256} sql_exporter" | sha256sum -c -
#########################################################################################
#
# Layer "awscli"
#
#########################################################################################
FROM build-deps AS awscli
ARG TARGETARCH
RUN set -ex; \
if [ "${TARGETARCH}" = "amd64" ]; then \
TARGETARCH_ALT="x86_64"; \
CHECKSUM="c9a9df3770a3ff9259cb469b6179e02829687a464e0824d5c32d378820b53a00"; \
elif [ "${TARGETARCH}" = "arm64" ]; then \
TARGETARCH_ALT="aarch64"; \
CHECKSUM="8181730be7891582b38b028112e81b4899ca817e8c616aad807c9e9d1289223a"; \
else \
echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \
fi; \
curl --retry 5 -L "https://awscli.amazonaws.com/awscli-exe-linux-${TARGETARCH_ALT}-2.17.5.zip" -o /tmp/awscliv2.zip; \
echo "${CHECKSUM} /tmp/awscliv2.zip" | sha256sum -c -; \
unzip /tmp/awscliv2.zip -d /tmp/awscliv2; \
/tmp/awscliv2/aws/install; \
rm -rf /tmp/awscliv2.zip /tmp/awscliv2
#########################################################################################
#
# Clean up postgres folder before inclusion
@@ -1887,9 +1864,6 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
mkdir /usr/local/download_extensions && \
chown -R postgres:postgres /usr/local/download_extensions
# aws cli is used by fast_import
COPY --from=awscli /usr/local/aws-cli /usr/local/aws-cli
# pgbouncer and its config
COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/pgbouncer
COPY --chmod=0666 --chown=postgres compute/etc/pgbouncer.ini /etc/pgbouncer.ini

View File

@@ -47,7 +47,9 @@ files:
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
# regardless of hostname (ALL)
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota
#
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff
- filename: cgconfig.conf
content: |
# Configuration for cgroups in VM compute nodes

View File

@@ -14,6 +14,7 @@ base64.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true
aws-sdk-kms.workspace = true
aws-smithy-types.workspace = true
anyhow.workspace = true
axum = { workspace = true, features = [] }
camino.workspace = true
@@ -54,6 +55,7 @@ thiserror.workspace = true
url.workspace = true
uuid.workspace = true
prometheus.workspace = true
walkdir.workspace = true
postgres_initdb.workspace = true
compute_api.workspace = true

View File

@@ -421,6 +421,7 @@ async fn run_dump_restore(
#[allow(clippy::too_many_arguments)]
async fn cmd_pgdata(
s3_client: Option<aws_sdk_s3::Client>,
kms_client: Option<aws_sdk_kms::Client>,
maybe_s3_prefix: Option<s3_uri::S3Uri>,
maybe_spec: Option<Spec>,
@@ -488,9 +489,13 @@ async fn cmd_pgdata(
// Only sync if s3_prefix was specified
if let Some(s3_prefix) = maybe_s3_prefix {
info!("upload pgdata");
aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/"))
.await
.context("sync dump directory to destination")?;
aws_s3_sync::upload_dir_recursive(
s3_client.as_ref().unwrap(),
Utf8Path::new(&pgdata_dir),
&s3_prefix.append("/pgdata/"),
)
.await
.context("sync dump directory to destination")?;
info!("write status");
{
@@ -499,9 +504,13 @@ async fn cmd_pgdata(
let status_file = status_dir.join("pgdata");
std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
.context("write status file")?;
aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/"))
.await
.context("sync status directory to destination")?;
aws_s3_sync::upload_dir_recursive(
s3_client.as_ref().unwrap(),
&status_dir,
&s3_prefix.append("/status/"),
)
.await
.context("sync status directory to destination")?;
}
}
@@ -573,18 +582,20 @@ pub(crate) async fn main() -> anyhow::Result<()> {
let args = Args::parse();
// Initialize AWS clients only if s3_prefix is specified
let (aws_config, kms_client) = if args.s3_prefix.is_some() {
let (s3_client, kms_client) = if args.s3_prefix.is_some() {
let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
let s3_client = aws_sdk_s3::Client::new(&config);
let kms = aws_sdk_kms::Client::new(&config);
(Some(config), Some(kms))
(Some(s3_client), Some(kms))
} else {
(None, None)
};
let spec: Option<Spec> = if let Some(s3_prefix) = &args.s3_prefix {
let spec_key = s3_prefix.append("/spec.json");
let s3_client = aws_sdk_s3::Client::new(aws_config.as_ref().unwrap());
let object = s3_client
.as_ref()
.unwrap()
.get_object()
.bucket(&spec_key.bucket)
.key(spec_key.key)
@@ -624,6 +635,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
memory_mb,
} => {
cmd_pgdata(
s3_client,
kms_client,
args.s3_prefix,
spec,

View File

@@ -1,24 +1,102 @@
use anyhow::Context;
use camino::Utf8Path;
use camino::{Utf8Path, Utf8PathBuf};
use tokio::task::JoinSet;
use walkdir::WalkDir;
use super::s3_uri::S3Uri;
pub(crate) async fn sync(local: &Utf8Path, remote: &S3Uri) -> anyhow::Result<()> {
let mut builder = tokio::process::Command::new("aws");
builder
.arg("s3")
.arg("sync")
.arg(local.as_str())
.arg(remote.to_string());
let st = builder
.spawn()
.context("spawn aws s3 sync")?
.wait()
.await
.context("wait for aws s3 sync")?;
if st.success() {
Ok(())
} else {
Err(anyhow::anyhow!("aws s3 sync failed"))
use tracing::{info, warn};
const MAX_PARALLEL_UPLOADS: usize = 10;
/// Upload all files from 'local' to 'remote'
pub(crate) async fn upload_dir_recursive(
s3_client: &aws_sdk_s3::Client,
local: &Utf8Path,
remote: &S3Uri,
) -> anyhow::Result<()> {
// Recursively scan directory
let mut dirwalker = WalkDir::new(local)
.into_iter()
.map(|entry| {
let entry = entry?;
let file_type = entry.file_type();
let path = <&Utf8Path>::try_from(entry.path())?.to_path_buf();
Ok((file_type, path))
})
.filter_map(|e: anyhow::Result<(std::fs::FileType, Utf8PathBuf)>| {
match e {
Ok((file_type, path)) if file_type.is_file() => Some(Ok(path)),
Ok((file_type, _path)) if file_type.is_dir() => {
// The WalkDir iterator will recurse into directories, but we don't want
// to do anything with directories as such. There's no concept of uploading
// an empty directory to S3.
None
}
Ok((file_type, path)) if file_type.is_symlink() => {
// huh, didn't expect a symlink. Can't upload that to S3. Warn and skip.
warn!("cannot upload symlink ({})", path);
None
}
Ok((_file_type, path)) => {
// should not happen
warn!("directory entry has unexpected type ({})", path);
None
}
Err(e) => Some(Err(e)),
}
});
// Spawn upload tasks for each file, keeping MAX_PARALLEL_UPLOADS active in
// parallel.
let mut joinset = JoinSet::new();
loop {
// Could we upload more?
while joinset.len() < MAX_PARALLEL_UPLOADS {
if let Some(full_local_path) = dirwalker.next() {
let full_local_path = full_local_path?;
let relative_local_path = full_local_path
.strip_prefix(local)
.expect("all paths start from the walkdir root");
let remote_path = remote.append(relative_local_path.as_str());
info!(
"starting upload of {} to {}",
&full_local_path, &remote_path
);
let upload_task = upload_file(s3_client.clone(), full_local_path, remote_path);
joinset.spawn(upload_task);
} else {
info!("draining upload tasks");
break;
}
}
// Wait for an upload to complete
if let Some(res) = joinset.join_next().await {
let _ = res?;
} else {
// all done!
break;
}
}
Ok(())
}
pub(crate) async fn upload_file(
s3_client: aws_sdk_s3::Client,
local_path: Utf8PathBuf,
remote: S3Uri,
) -> anyhow::Result<()> {
use aws_smithy_types::byte_stream::ByteStream;
let stream = ByteStream::from_path(&local_path).await?;
let _result = s3_client
.put_object()
.bucket(remote.bucket)
.key(&remote.key)
.body(stream)
.send()
.await?;
info!("upload of {} to {} finished", &local_path, &remote.key);
Ok(())
}

View File

@@ -570,8 +570,12 @@ impl LayerMap {
self.historic.iter()
}
pub fn riter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
self.historic.riter()
}
/// Get a ref counted pointer for the first in memory layer that matches the provided predicate.
pub fn find_in_memory_layer<Pred>(&self, mut pred: Pred) -> Option<Arc<InMemoryLayer>>
pub(crate) fn find_in_memory_layer<Pred>(&self, mut pred: Pred) -> Option<Arc<InMemoryLayer>>
where
Pred: FnMut(&Arc<InMemoryLayer>) -> bool,
{
@@ -900,6 +904,24 @@ impl LayerMap {
Ok(())
}
/// Efficiency: this is a single btreemap walk to the end of the map in the common case where
/// we are queried for image layers after the start of an ephemeral layer. In the general case
/// where we are called with some arbitrary LSN, this function is O(N) -- so don't use it like that.
pub(crate) fn get_newest_image_after(&self, lsn: Lsn) -> Option<Arc<PersistentLayerDesc>> {
// TODO: an efficient equivalent, this is a crude placeholder
for layer in self.riter_historic_layers() {
if !layer.is_delta() && layer.image_layer_lsn() >= lsn {
return Some(layer);
}
if layer.lsn_range.start < lsn {
// We are past the layers that could possibly intersect with the requested bound
break;
}
}
None
}
/// `read_points` represent the tip of a timeline and any branch points, i.e. the places
/// where we expect to serve reads.
///

View File

@@ -509,6 +509,18 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
self.layers.values().cloned()
}
/// Iterate all the layers in reverse order (newest LSNs first)
pub fn riter(&self) -> impl '_ + Iterator<Item = Value> {
// NOTE we can actually perform this without rebuilding,
// but it's not necessary for now.
if !self.buffer.is_empty() {
panic!("rebuild pls")
}
// TODO: is cloned() really needed?
self.layers.values().rev().cloned()
}
/// Return a reference to a queryable map, assuming all updates
/// have already been processed using self.rebuild()
pub fn get(&self) -> anyhow::Result<&HistoricLayerCoverage<Value>> {

View File

@@ -3775,6 +3775,8 @@ impl Timeline {
let mut completed_keyspace = KeySpace::default();
let mut image_covered_keyspace = KeySpaceRandomAccum::new();
let mut in_memory_layers_considered = Vec::new();
// Prevent GC from progressing while visiting the current timeline.
// If we are GC-ing because a new image layer was added while traversing
// the timeline, then it will remove layers that are required for fulfilling
@@ -3810,12 +3812,34 @@ impl Timeline {
let in_memory_layer = layers.find_in_memory_layer(|l| {
let start_lsn = l.get_lsn_range().start;
cont_lsn > start_lsn
!in_memory_layers_considered.contains(&start_lsn) && cont_lsn > start_lsn
});
match in_memory_layer {
Some(l) => {
let lsn_range = l.get_lsn_range().start..cont_lsn;
in_memory_layers_considered.push(l.get_lsn_range().start);
// Search for image layers that overlap with the in-memory layer: this is rare but permitted, and
// we must bound the `lsn_range` of this layer to avoid skipping past the image layer.
// TODO: a narrower search that only hits on image layers matching `unmapped_keyspace`
let lsn_range = if let Some(image) =
layers.get_newest_image_after(l.get_lsn_range().start)
{
// Note that this does not guarantee serving a read from an image layer, just that we will
// not skip considering thge image layer in our Fringe. We can still end up doing walredo work
// in spite of the presence of an image layer, if the inmemory layers we visit contain enough
// information to fully construct a page. For example:
// - ephemeral layer contains I1, D1, D2, <LSN X>
// - image layer at LSN X contains image equal to I2
// - we will end up doing a walredo of I1+D1+D2, rather than reading from the image layer
//
// This is not a problem for correctness, and is rare enough that the wasted time doing walredo
// doesn't matter.
image.get_lsn_range().start + 1..cont_lsn
} else {
l.get_lsn_range().start..cont_lsn
};
fringe.update(
ReadableLayer::InMemoryLayer(l),
unmapped_keyspace.clone(),

View File

@@ -1019,8 +1019,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
self.ws_guard
.walsenders
.record_ps_feedback(self.ws_guard.id, &ps_feedback);
// This here is the principal spot that updates remote_consistent_lsn.
// The other spot where it's updated from peers will go away.
self.tli
.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
.await;

View File

@@ -222,13 +222,11 @@ impl StateSK {
state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), state.inmem.backup_lsn);
sync_control_file |= state.backup_lsn + wal_seg_size < state.inmem.backup_lsn;
// This will go away in the new world.
state.inmem.remote_consistent_lsn = max(
Lsn(sk_info.remote_consistent_lsn),
state.inmem.remote_consistent_lsn,
);
sync_control_file |=
// WTF is this criteria?
state.remote_consistent_lsn + wal_seg_size < state.inmem.remote_consistent_lsn;
state.inmem.peer_horizon_lsn =

View File

@@ -503,8 +503,7 @@ impl Manager {
) {
let is_active = is_wal_backup_required
|| num_computes > 0
// state.remote_consistent_lsn is the inmemo remote consistent lsn
|| state.remote_consistent_lsn < state.commit_lsn; // this here changes with the design;
|| state.remote_consistent_lsn < state.commit_lsn;
// update the broker timeline set
if self.tli_broker_active.set(is_active) {
@@ -517,7 +516,7 @@ impl Manager {
MANAGER_ACTIVE_CHANGES.inc();
}
// update the state in Arc<Timeline> (which is only used for informational APIs)
// update the state in Arc<Timeline>
self.tli
.broker_active
.store(is_active, std::sync::atomic::Ordering::Relaxed);

View File

@@ -491,6 +491,7 @@ class NeonEnvBuilder:
self.test_may_use_compatibility_snapshot_binaries = False
self.version_combination = combination
self.mixdir = self.test_output_dir / "mixdir_neon"
if self.version_combination is not None:
assert (
self.compatibility_neon_binpath is not None
@@ -702,6 +703,11 @@ class NeonEnvBuilder:
def _mix_versions(self):
assert self.version_combination is not None, "version combination must be set"
# Always use a newer version of `neon_local`
(self.mixdir / "neon_local").symlink_to(self.neon_binpath / "neon_local")
self.neon_local_binpath = self.mixdir
for component, paths in COMPONENT_BINARIES.items():
directory = (
self.neon_binpath
@@ -711,9 +717,10 @@ class NeonEnvBuilder:
for filename in paths:
destination = self.mixdir / filename
destination.symlink_to(directory / filename)
self.neon_binpath = self.mixdir
if self.version_combination["compute"] == "old":
self.pg_distrib_dir = self.compatibility_pg_distrib_dir
self.neon_binpath = self.mixdir
def overlay_mount(self, ident: str, srcdir: Path, dstdir: Path):
"""

View File

@@ -52,11 +52,11 @@ COMPONENT_BINARIES = {
# Disable auto-formatting for better readability
# fmt: off
VERSIONS_COMBINATIONS = (
{"storage_controller": "new", "storage_broker": "new", "compute": "new", "safekeeper": "new", "pageserver": "new"},
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "old"},
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "new"},
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "new", "pageserver": "new"},
{"storage_controller": "old", "storage_broker": "old", "compute": "new", "safekeeper": "new", "pageserver": "new"},
{"storage_controller": "new", "storage_broker": "new", "compute": "new", "safekeeper": "new", "pageserver": "new"}, # combination: nnnnn
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "old"}, # combination: ooonn
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "new"}, # combination: ononn
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "new", "pageserver": "new"}, # combination: onnnn
{"storage_controller": "old", "storage_broker": "old", "compute": "new", "safekeeper": "new", "pageserver": "new"}, # combination: nnnoo
)
# fmt: on

View File

@@ -1821,7 +1821,7 @@ def test_sharding_gc(
# TODO: remove when https://github.com/neondatabase/neon/issues/10720 is fixed
ps.allowed_errors.extend(
[
".*could not find data for key.*",
".*could not find data for key 020000000000000000000000000000000000.*",
".*could not ingest record.*",
]
)

View File

@@ -318,7 +318,7 @@ def test_scrubber_physical_gc_ancestors(neon_env_builder: NeonEnvBuilder, shard_
# TODO: remove when https://github.com/neondatabase/neon/issues/10720 is fixed
ps.allowed_errors.extend(
[
".*could not find data for key.*",
".*could not find data for key 020000000000000000000000000000000000.*",
".*could not ingest record.*",
]
)