Compare commits

..

13 Commits

Author SHA1 Message Date
Anastasia Lubennikova
efaf2c6663 RFC Merged compute image 2024-04-02 16:12:12 +01:00
Alexander Bayandin
90a8ff55fa CI(benchmarking): Add Sharded Tenant for pgbench (#7186)
## Problem

During Nightly Benchmarks, we want to collect pgbench results for
sharded tenants as well.

## Summary of changes
- Add pre-created sharded project for pgbench
2024-04-02 14:39:24 +01:00
macdoos
3b95e8072a test_runner: replace all .format() with f-strings (#7194) 2024-04-02 14:32:14 +01:00
Conrad Ludgate
8ee54ffd30 update tokio 1.37 (#7276)
## Problem

## Summary of changes

`cargo update -p tokio`.

The only risky change I could see is the `tokio::io::split` moving from
a spin-lock to a mutex but I think that's ok.
2024-04-02 10:12:54 +01:00
Alex Chi Z
3ab9f56f5f fixup(#7278/compute_ctl): remote extension download permission (#7280)
Fix #7278 

## Summary of changes

* Explicitly create the extension download directory and assign correct
permissoins.
* Fix the problem that the extension download failure will cause all
future downloads to fail.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-03-29 17:59:30 +00:00
Alex Chi Z
7ddc7b4990 neonvm: add LFC approximate working set size to metrics (#7252)
ref https://github.com/neondatabase/autoscaling/pull/878
ref https://github.com/neondatabase/autoscaling/issues/872

Add `approximate_working_set_size` to sql exporter so that autoscaling
can use it in the future.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Peter Bendel <peterbendel@neon.tech>
2024-03-29 12:11:17 -04:00
John Spray
63213fc814 storage controller: scheduling optimization for sharded tenants (#7181)
## Problem

- When we scheduled locations, we were doing it without any context
about other shards in the same tenant
- After a shard split, there wasn't an automatic mechanism to migrate
the attachments away from the split location
- After a shard split and the migration away from the split location,
there wasn't an automatic mechanism to pick new secondary locations so
that the end state has no concentration of locations on the nodes where
the split happened.

Partially completes: https://github.com/neondatabase/neon/issues/7139

## Summary of changes

- Scheduler now takes a `ScheduleContext` object that can be populated
with information about other shards
- During tenant creation and shard split, we incrementally build up the
ScheduleContext, updating it for each shard as we proceed.
- When scheduling new locations, the ScheduleContext is used to apply a
soft anti-affinity to nodes where a tenant already has shards.
- The background reconciler task now has an extra phase `optimize_all`,
which runs only if the primary `reconcile_all` phase didn't generate any
work. The separation is that `reconcile_all` is needed for availability,
but optimize_all is purely "nice to have" work to balance work across
the nodes better.
- optimize_all calls into two new TenantState methods called
optimize_attachment and optimize_secondary, which seek out opportunities
to improve placment:
- optimize_attachment: if the node where we're currently attached has an
excess of attached shard locations for this tenant compared with the
node where we have a secondary location, then cut over to the secondary
location.
- optimize_secondary: if the node holding our secondary location has an
excessive number of locations for this tenant compared with some other
node where we don't currently have a location, then create a new
secondary location on that other node.
- a new debug API endpoint is provided to run background tasks
on-demand. This returns a number of reconciliations in progress, so
callers can keep calling until they get a `0` to advance the system to
its final state without waiting for many iterations of the background
task.

Optimization is run at an implicitly low priority by:
- Omitting the phase entirely if reconcile_all has work to do
- Skipping optimization of any tenant that has reconciles in flight
- Limiting the total number of optimizations that will be run from one
call to optimize_all to a constant (currently 2).

The idea of that low priority execution is to minimize the operational
risk that optimization work overloads any part of the system. It happens
to also make the system easier to observe and debug, as we avoid running
large numbers of concurrent changes. Eventually we may relax these
limitations: there is no correctness problem with optimizing lots of
tenants concurrently, and optimizing multiple shards in one tenant just
requires housekeeping changes to update ShardContext with the result of
one optimization before proceeding to the next shard.
2024-03-28 18:48:52 +00:00
Vlad Lazar
090123a429 pageserver: check for new image layers based on ingested WAL (#7230)
## Problem
Part of the legacy (but current) compaction algorithm is to find a stack
of overlapping delta layers which will be turned
into an image layer. This operation is exponential in terms of the
number of matching layers and we do it roughly every 20 seconds.

## Summary of changes
Only check if a new image layer is required if we've ingested a certain
amount of WAL since the last check.
The amount of wal is expressed in terms of multiples of checkpoint
distance, with the intuition being that
that there's little point doing the check if we only have two new L1
layers (not enough to create a new image).
2024-03-28 17:44:55 +00:00
John Spray
39d1818ae9 storage controller: be more tolerant of control plane blocking notifications (#7268)
## Problem

- Control plane can deadlock if it calls into a function that requires
reconciliation to complete, while refusing compute notification hooks
API calls.

## Summary of changes

- Fail faster in the notify path in 438 errors: these were originally
expected to be transient, but in practice it's more common that a 438
results from an operation blocking on the currently API call, rather
than something happening in the background.
- In ensure_attached, relax the condition for spawning a reconciler:
instead of just the general maybe_reconcile path, do a pre-check that
skips trying to reconcile if the shard appears to be attached. This
avoids doing work in cases where the tenant is attached, but is dirty
from a reconciliation point of view, e.g. due to a failed compute
notification.
2024-03-28 17:38:08 +00:00
Alex Chi Z
90be79fcf5 spec: allow neon extension auto-upgrade + softfail upgrade (#7231)
reverts https://github.com/neondatabase/neon/pull/7128, unblocks
https://github.com/neondatabase/cloud/issues/10742

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-03-28 17:22:35 +00:00
Alexander Bayandin
c52b80b930 CI(deploy): Do not deploy storage controller to preprod for proxy releases (#7269)
## Problem

Proxy release to a preprod automatically triggers a deployment of storage
controller (`deployStorageController=true` by default)

## Summary of changes
- Set `deployStorageController=false` for proxy releases to preprod
- Set explicitly `deployStorageController=true` for storage releases to
preprod and prod
2024-03-28 16:51:45 +00:00
Anastasia Lubennikova
722f271f6e Specify caller in 'unexpected response from page server' error (#7272)
Tiny improvement for log messages to investigate
https://github.com/neondatabase/cloud/issues/11559
2024-03-28 15:28:58 +00:00
Alex Chi Z
be1d8fc4f7 fix: drop replication slot causes postgres stuck on exit (#7192)
Fix https://github.com/neondatabase/neon/issues/6969

Ref https://github.com/neondatabase/postgres/pull/395
https://github.com/neondatabase/postgres/pull/396

Postgres will stuck on exit if the replication slot is not dropped
before shutting down. This is caused by Neon's custom WAL record to
record replication slots. The pull requests in the postgres repo fixes
the problem, and this pull request bumps the postgres commit.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-03-28 15:24:36 +00:00
49 changed files with 397 additions and 646 deletions

View File

@@ -147,15 +147,16 @@ jobs:
"neonvm-captest-new"
],
"db_size": [ "10gb" ],
"include": [{ "platform": "neon-captest-freetier", "db_size": "3gb" },
{ "platform": "neon-captest-new", "db_size": "50gb" },
{ "platform": "neonvm-captest-freetier", "db_size": "3gb" },
{ "platform": "neonvm-captest-new", "db_size": "50gb" }]
"include": [{ "platform": "neon-captest-freetier", "db_size": "3gb" },
{ "platform": "neon-captest-new", "db_size": "50gb" },
{ "platform": "neonvm-captest-freetier", "db_size": "3gb" },
{ "platform": "neonvm-captest-new", "db_size": "50gb" },
{ "platform": "neonvm-captest-sharding-reuse", "db_size": "50gb" }]
}'
if [ "$(date +%A)" = "Saturday" ]; then
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres", "db_size": "10gb"},
{ "platform": "rds-aurora", "db_size": "50gb"}]')
{ "platform": "rds-aurora", "db_size": "50gb"}]')
fi
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
@@ -171,7 +172,7 @@ jobs:
if [ "$(date +%A)" = "Saturday" ] || [ ${RUN_AWS_RDS_AND_AURORA} = "true" ]; then
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres" },
{ "platform": "rds-aurora" }]')
{ "platform": "rds-aurora" }]')
fi
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
@@ -190,7 +191,7 @@ jobs:
if [ "$(date +%A)" = "Saturday" ] || [ ${RUN_AWS_RDS_AND_AURORA} = "true" ]; then
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres", "scale": "10" },
{ "platform": "rds-aurora", "scale": "10" }]')
{ "platform": "rds-aurora", "scale": "10" }]')
fi
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
@@ -253,6 +254,9 @@ jobs:
neon-captest-reuse)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR }}
;;
neonvm-captest-sharding-reuse)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_SHARDING_CONNSTR }}
;;
neon-captest-new | neon-captest-freetier | neonvm-captest-new | neonvm-captest-freetier)
CONNSTR=${{ steps.create-neon-project.outputs.dsn }}
;;
@@ -270,11 +274,15 @@ jobs:
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
QUERY="SELECT version();"
QUERIES=("SELECT version()")
if [[ "${PLATFORM}" = "neon"* ]]; then
QUERY="${QUERY} SHOW neon.tenant_id; SHOW neon.timeline_id;"
QUERIES+=("SHOW neon.tenant_id")
QUERIES+=("SHOW neon.timeline_id")
fi
psql ${CONNSTR} -c "${QUERY}"
for q in "${QUERIES[@]}"; do
psql ${CONNSTR} -c "${q}"
done
- name: Benchmark init
uses: ./.github/actions/run-python-test-set
@@ -401,11 +409,15 @@ jobs:
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
QUERY="SELECT version();"
QUERIES=("SELECT version()")
if [[ "${PLATFORM}" = "neon"* ]]; then
QUERY="${QUERY} SHOW neon.tenant_id; SHOW neon.timeline_id;"
QUERIES+=("SHOW neon.tenant_id")
QUERIES+=("SHOW neon.timeline_id")
fi
psql ${CONNSTR} -c "${QUERY}"
for q in "${QUERIES[@]}"; do
psql ${CONNSTR} -c "${q}"
done
- name: ClickBench benchmark
uses: ./.github/actions/run-python-test-set
@@ -507,11 +519,15 @@ jobs:
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
QUERY="SELECT version();"
QUERIES=("SELECT version()")
if [[ "${PLATFORM}" = "neon"* ]]; then
QUERY="${QUERY} SHOW neon.tenant_id; SHOW neon.timeline_id;"
QUERIES+=("SHOW neon.tenant_id")
QUERIES+=("SHOW neon.timeline_id")
fi
psql ${CONNSTR} -c "${QUERY}"
for q in "${QUERIES[@]}"; do
psql ${CONNSTR} -c "${q}"
done
- name: Run TPC-H benchmark
uses: ./.github/actions/run-python-test-set
@@ -597,11 +613,15 @@ jobs:
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
QUERY="SELECT version();"
QUERIES=("SELECT version()")
if [[ "${PLATFORM}" = "neon"* ]]; then
QUERY="${QUERY} SHOW neon.tenant_id; SHOW neon.timeline_id;"
QUERIES+=("SHOW neon.tenant_id")
QUERIES+=("SHOW neon.timeline_id")
fi
psql ${CONNSTR} -c "${QUERY}"
for q in "${QUERIES[@]}"; do
psql ${CONNSTR} -c "${q}"
done
- name: Run user examples
uses: ./.github/actions/run-python-test-set

View File

@@ -1127,6 +1127,7 @@ jobs:
-f deployProxy=false \
-f deployStorage=true \
-f deployStorageBroker=true \
-f deployStorageController=true \
-f branch=main \
-f dockerTag=${{needs.tag.outputs.build-tag}} \
-f deployPreprodRegion=true
@@ -1136,6 +1137,7 @@ jobs:
-f deployProxy=false \
-f deployStorage=true \
-f deployStorageBroker=true \
-f deployStorageController=true \
-f branch=main \
-f dockerTag=${{needs.tag.outputs.build-tag}}
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
@@ -1144,6 +1146,7 @@ jobs:
-f deployProxy=true \
-f deployStorage=false \
-f deployStorageBroker=false \
-f deployStorageController=false \
-f branch=main \
-f dockerTag=${{needs.tag.outputs.build-tag}} \
-f deployPreprodRegion=true

4
Cargo.lock generated
View File

@@ -5934,9 +5934,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.36.0"
version = "1.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931"
checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787"
dependencies = [
"backtrace",
"bytes",

View File

@@ -944,6 +944,9 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
# Create remote extension download directory
RUN mkdir /usr/local/download_extensions && chown -R postgres:postgres /usr/local/download_extensions
# Install:
# libreadline8 for psql
# libicu67, locales for collations (including ICU and plpgsql_check)

View File

@@ -1262,10 +1262,12 @@ LIMIT 100",
.await
.map_err(DownloadError::Other);
self.ext_download_progress
.write()
.expect("bad lock")
.insert(ext_archive_name.to_string(), (download_start, true));
if download_size.is_ok() {
self.ext_download_progress
.write()
.expect("bad lock")
.insert(ext_archive_name.to_string(), (download_start, true));
}
download_size
}

View File

@@ -743,21 +743,24 @@ pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
// which may happen in two cases:
// - extension was just installed
// - extension was already installed and is up to date
// DISABLED due to compute node unpinning epic
// let query = "ALTER EXTENSION neon UPDATE";
// info!("update neon extension version with query: {}", query);
// client.simple_query(query)?;
let query = "ALTER EXTENSION neon UPDATE";
info!("update neon extension version with query: {}", query);
if let Err(e) = client.simple_query(query) {
error!(
"failed to upgrade neon extension during `handle_extension_neon`: {}",
e
);
}
Ok(())
}
#[instrument(skip_all)]
pub fn handle_neon_extension_upgrade(_client: &mut Client) -> Result<()> {
info!("handle neon extension upgrade (not really)");
// DISABLED due to compute node unpinning epic
// let query = "ALTER EXTENSION neon UPDATE";
// info!("update neon extension version with query: {}", query);
// client.simple_query(query)?;
pub fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
info!("handle neon extension upgrade");
let query = "ALTER EXTENSION neon UPDATE";
info!("update neon extension version with query: {}", query);
client.simple_query(query)?;
Ok(())
}

View File

@@ -14,7 +14,6 @@ use utils::{
use crate::service::Config;
const BUSY_DELAY: Duration = Duration::from_secs(1);
const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
pub(crate) const API_CONCURRENCY: usize = 32;
@@ -280,11 +279,10 @@ impl ComputeHook {
Err(NotifyError::SlowDown)
}
StatusCode::LOCKED => {
// Delay our retry if busy: the usual fast exponential backoff in backoff::retry
// is not appropriate
tokio::time::timeout(BUSY_DELAY, cancel.cancelled())
.await
.ok();
// We consider this fatal, because it's possible that the operation blocking the control one is
// also the one that is waiting for this reconcile. We should let the reconciler calling
// this hook fail, to give control plane a chance to un-lock.
tracing::info!("Control plane reports tenant is locked, dropping out of notify");
Err(NotifyError::Busy)
}
StatusCode::SERVICE_UNAVAILABLE
@@ -306,7 +304,12 @@ impl ComputeHook {
let client = reqwest::Client::new();
backoff::retry(
|| self.do_notify_iteration(&client, url, &reconfigure_request, cancel),
|e| matches!(e, NotifyError::Fatal(_) | NotifyError::Unexpected(_)),
|e| {
matches!(
e,
NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
)
},
3,
10,
"Send compute notification",

View File

@@ -37,6 +37,9 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_reconcile_complete:
measured::CounterVec<ReconcileCompleteLabelGroupSet>,
/// Count of how many times we make an optimization change to a tenant's scheduling
pub(crate) storage_controller_schedule_optimization: measured::Counter,
/// HTTP request status counters for handled requests
pub(crate) storage_controller_http_request_status:
measured::CounterVec<HttpRequestStatusLabelGroupSet>,
@@ -101,6 +104,7 @@ impl StorageControllerMetricGroup {
status: StaticLabelSet::new(),
},
),
storage_controller_schedule_optimization: measured::Counter::new(),
storage_controller_http_request_status: measured::CounterVec::new(
HttpRequestStatusLabelGroupSet {
path: lasso::ThreadedRodeo::new(),

View File

@@ -101,15 +101,6 @@ impl PageserverClient {
)
}
pub(crate) async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
measured_request!(
"tenant_heatmap_upload",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.tenant_heatmap_upload(tenant_id).await
)
}
pub(crate) async fn location_config(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -487,6 +487,7 @@ impl Reconciler {
while let Err(e) = self.compute_notify().await {
match e {
NotifyError::Fatal(_) => return Err(ReconcileError::Notify(e)),
NotifyError::ShuttingDown => return Err(ReconcileError::Cancel),
_ => {
tracing::warn!(
"Live migration blocked by compute notification error, retrying: {e}"

View File

@@ -288,8 +288,15 @@ impl Scheduler {
node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
}
/// Hard Exclude: only consider nodes not in this list.
/// Soft exclude: only use nodes in this list if no others are available.
/// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
/// are already in use by this shard -- we use this to avoid picking the same node
/// as both attached and secondary location. This is a hard constraint: if we cannot
/// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
///
/// context: we prefer to avoid using nodes identified in the context, according
/// to their anti-affinity score. We use this to prefeer to avoid placing shards in
/// the same tenant on the same node. This is a soft constraint: the context will never
/// cause us to fail to schedule a shard.
pub(crate) fn schedule_shard(
&self,
hard_exclude: &[NodeId],

View File

@@ -3409,14 +3409,6 @@ impl Service {
.join(",")
);
// Optimization: publish heatmaps immediately, so that secondary locations can start warming up.
for child in child_ids {
if let Err(e) = client.tenant_heatmap_upload(*child).await {
// Non-fatal, this is just an optimization
tracing::warn!("Failed to upload child {child} heatmap: {e}");
}
}
if &response.new_shards != child_ids {
// This should never happen: the pageserver should agree with us on how shard splits work.
return Err(ApiError::InternalServerError(anyhow::anyhow!(
@@ -3972,9 +3964,6 @@ impl Service {
/// Helper for methods that will try and call pageserver APIs for
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
/// is attached somewhere.
///
/// TODO: this doesn't actually ensure attached unless the PlacementPolicy is
/// an attached policy. We should error out if it isn't.
fn ensure_attached_schedule(
&self,
mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>,
@@ -3984,10 +3973,26 @@ impl Service {
let (nodes, tenants, scheduler) = locked.parts_mut();
let mut schedule_context = ScheduleContext::default();
for (_tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
for (tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
shard.schedule(scheduler, &mut schedule_context)?;
// The shard's policies may not result in an attached location being scheduled: this
// is an error because our caller needs it attached somewhere.
if shard.intent.get_attached().is_none() {
return Err(anyhow::anyhow!(
"Tenant {tenant_id} not scheduled to be attached"
));
};
if shard.stably_attached().is_some() {
// We do not require the shard to be totally up to date on reconciliation: we just require
// that it has been attached on the intended node. Other dirty state such as unattached secondary
// locations, or compute hook notifications can be ignored.
continue;
}
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
tracing::info!("Waiting for shard {tenant_shard_id} to reconcile, in order to ensure it is attached");
waiters.push(waiter);
}
}
@@ -4113,6 +4118,18 @@ impl Service {
break;
}
match shard.get_scheduling_policy() {
ShardSchedulingPolicy::Active => {
// Ok to do optimization
}
ShardSchedulingPolicy::Essential
| ShardSchedulingPolicy::Pause
| ShardSchedulingPolicy::Stop => {
// Policy prevents optimizing this shard.
continue;
}
}
// Accumulate the schedule context for all the shards in a tenant: we must have
// the total view of all shards before we can try to optimize any of them.
schedule_context.avoid(&shard.intent.all_pageservers());
@@ -4131,11 +4148,12 @@ impl Service {
continue;
}
if tenant_shards
.iter()
.any(|s| !matches!(s.splitting, SplitState::Idle))
{
// Never attempt to optimize a tenant that is currently being split
if tenant_shards.iter().any(|s| {
!matches!(s.splitting, SplitState::Idle)
|| matches!(s.policy, PlacementPolicy::Detached)
}) {
// Never attempt to optimize a tenant that is currently being split, or
// a tenant that is meant to be detached
continue;
}

View File

@@ -723,6 +723,11 @@ impl TenantState {
scheduler: &mut Scheduler,
optimization: ScheduleOptimization,
) {
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_schedule_optimization
.inc();
match optimization {
ScheduleOptimization::MigrateAttachment(MigrateAttachment {
old_attached_node_id,
@@ -1080,6 +1085,10 @@ impl TenantState {
self.scheduling_policy = p;
}
pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
&self.scheduling_policy
}
pub(crate) fn from_persistent(
tsp: TenantShardPersistence,
intent: IntentState,

View File

@@ -389,6 +389,10 @@ impl PageServerNode {
.remove("image_creation_threshold")
.map(|x| x.parse::<usize>())
.transpose()?,
image_layer_creation_check_threshold: settings
.remove("image_layer_creation_check_threshold")
.map(|x| x.parse::<u8>())
.transpose()?,
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
walreceiver_connect_timeout: settings
.remove("walreceiver_connect_timeout")
@@ -501,6 +505,12 @@ impl PageServerNode {
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'image_creation_threshold' as non zero integer")?,
image_layer_creation_check_threshold: settings
.remove("image_layer_creation_check_threshold")
.map(|x| x.parse::<u8>())
.transpose()
.context("Failed to parse 'image_creation_check_threshold' as integer")?,
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
walreceiver_connect_timeout: settings
.remove("walreceiver_connect_timeout")

View File

@@ -0,0 +1,54 @@
## Merged compute image
https://github.com/neondatabase/neon/issues/6685
### Motivation:
It's hard to manage compute pools for 3 Postgres versions.
(we have a compute image for each version of Postgres (currently, it's 3 for neonVM and 3 for k8s pods; eventually, we will have only neonVMs)).
We can try putting all Postgres versions into a single image, which should dramatically improve pool usage.
### TODO
#### Compute code changes:
1. Create merged compute image https://github.com/neondatabase/neon/pull/6808
2. Pass compute version in spec from control-plane
3. Change path to the postgres in compute_ctl. Now it is not specified explicitly.
`compute_ctl` has `pgbin` and `pgdata` arguments, now they are used only in tests.
3. Make changes to custom_extension code - fix path handling.
#### Control-plane changes:
1. Pass compute version in spec from control-plane
2. Remove old logic of VM pools management
#### Prewarm changes:
Currently, for pooled VMs, we prewarm postgres to improve cold start speed
```
// If this is a pooled VM, prewarm before starting HTTP server and becoming
// available for binding. Prewarming helps Postgres start quicker later,
// because QEMU will already have it's memory allocated from the host, and
// the necessary binaries will already be cached.
```
Prewarm = initdb + start postgres + rm pgdata
Q: How should we do prewarm, if we don't know in adwance, what version of postgres will be used?
I see two options:
- use versioned pgdata directories and run prewarm operations for all existing versions.
- chose "default_version" for each pooled VM and run prewarm. Try to start compute in pooled VM with matching version, in case it doesn't exist, spin compute in any existing VM. Start will be slower, because it is not prewarmed.
#### Extensions support
To support merged compute image (image, containing all supported versions of postgres),
we need to offload extensions from the image. We can implement this using "custom extensions" mechanism.
Custom extensions changes:
1. We need to move all extensions from main compute image file to the build-custom-extensions repo
2. We need to generate spec for all public extensions and pass it to compute image
Spec contains information about files in the extension and paths,
and also content of the control file. Currently it is set manually per-user, for single users that use "rare" custom extensions. We need to improve spec passing.
For public extensions, we can embed this spec into compute image: use artifact from build-custom-extension CI step and put it into compute image.
3. We need to test performance of the extension downloading and ensure that it doesn't affect cold starts (with proxy the speed should be fine).
4. Note that in this task we are not trying to solve extension versioning issue and assume that all extensions are mapped to compute images 1-1 as they are now.
#### Test changes:
- This is general functionality and will be covered by e2e tests.
- We will need to add test for extensions, to ensure that they are available for every new compute version. Don't need to run extension regression tests here. Just ensure that `CREATE EXTENSION ext;` works.

View File

@@ -301,6 +301,7 @@ pub struct TenantConfig {
pub heatmap_period: Option<String>,
pub lazy_slru_download: Option<bool>,
pub timeline_get_throttle: Option<ThrottleConfig>,
pub image_layer_creation_check_threshold: Option<u8>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]

View File

@@ -271,17 +271,6 @@ impl Client {
Ok((status, progress))
}
pub async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
let path = reqwest::Url::parse(&format!(
"{}/v1/tenant/{}/heatmap_upload",
self.mgmt_api_endpoint, tenant_id
))
.expect("Cannot build URL");
self.request(Method::POST, path, ()).await?;
Ok(())
}
pub async fn location_config(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -3653,6 +3653,9 @@ pub(crate) mod harness {
heatmap_period: Some(tenant_conf.heatmap_period),
lazy_slru_download: Some(tenant_conf.lazy_slru_download),
timeline_get_throttle: Some(tenant_conf.timeline_get_throttle),
image_layer_creation_check_threshold: Some(
tenant_conf.image_layer_creation_check_threshold,
),
}
}
}

View File

@@ -57,6 +57,9 @@ pub mod defaults {
// throughputs up to 1GiB/s per timeline.
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 1024 * 1024 * 1024;
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
// By default ingest enough WAL for two new L0 layers before checking if new image
// image layers should be created.
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
}
@@ -362,6 +365,10 @@ pub struct TenantConf {
pub lazy_slru_download: bool,
pub timeline_get_throttle: pageserver_api::models::ThrottleConfig,
// How much WAL must be ingested before checking again whether a new image layer is required.
// Expresed in multiples of checkpoint distance.
pub image_layer_creation_check_threshold: u8,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -454,6 +461,9 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
pub timeline_get_throttle: Option<pageserver_api::models::ThrottleConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub image_layer_creation_check_threshold: Option<u8>,
}
impl TenantConfOpt {
@@ -508,6 +518,9 @@ impl TenantConfOpt {
.timeline_get_throttle
.clone()
.unwrap_or(global_conf.timeline_get_throttle),
image_layer_creation_check_threshold: self
.image_layer_creation_check_threshold
.unwrap_or(global_conf.image_layer_creation_check_threshold),
}
}
}
@@ -548,6 +561,7 @@ impl Default for TenantConf {
heatmap_period: Duration::ZERO,
lazy_slru_download: false,
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
}
}
}
@@ -621,6 +635,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
heatmap_period: value.heatmap_period.map(humantime),
lazy_slru_download: value.lazy_slru_download,
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
}
}
}

View File

@@ -309,6 +309,8 @@ pub struct Timeline {
/// Configuration: how often should the partitioning be recalculated.
repartition_threshold: u64,
last_image_layer_creation_check_at: AtomicLsn,
/// Current logical size of the "datadir", at the last LSN.
current_logical_size: LogicalSize,
@@ -1632,6 +1634,15 @@ impl Timeline {
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
}
fn get_image_layer_creation_check_threshold(&self) -> u8 {
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
tenant_conf.image_layer_creation_check_threshold.unwrap_or(
self.conf
.default_tenant_conf
.image_layer_creation_check_threshold,
)
}
pub(super) fn tenant_conf_updated(&self) {
// NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up.
@@ -1769,6 +1780,7 @@ impl Timeline {
},
partitioning: tokio::sync::Mutex::new((KeyPartitioning::new(), Lsn(0))),
repartition_threshold: 0,
last_image_layer_creation_check_at: AtomicLsn::new(0),
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(HashMap::new()),
@@ -1797,6 +1809,7 @@ impl Timeline {
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
result
.metrics
.last_record_gauge
@@ -3501,6 +3514,24 @@ impl Timeline {
// Is it time to create a new image layer for the given partition?
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
let last = self.last_image_layer_creation_check_at.load();
if lsn != Lsn(0) {
let distance = lsn
.checked_sub(last)
.expect("Attempt to compact with LSN going backwards");
let min_distance = self.get_image_layer_creation_check_threshold() as u64
* self.get_checkpoint_distance();
// Skip the expensive delta layer counting below if we've not ingested
// sufficient WAL since the last check.
if distance.0 < min_distance {
return false;
}
}
self.last_image_layer_creation_check_at.store(lsn);
let threshold = self.get_image_creation_threshold();
let guard = self.layers.read().await;

View File

@@ -1688,7 +1688,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
break;
default:
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_exists", resp->tag);
}
pfree(resp);
return exists;
@@ -2224,7 +2224,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
((NeonErrorResponse *) resp)->message)));
break;
default:
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_read_at_lsn", resp->tag);
}
/* buffer was used, clean up for later reuse */
@@ -2497,7 +2497,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
break;
default:
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_nblocks", resp->tag);
}
update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks);
@@ -2552,7 +2552,7 @@ neon_dbsize(Oid dbNode)
break;
default:
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_dbsize", resp->tag);
}
neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes",
@@ -2857,7 +2857,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
break;
default:
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_read_slru_segment", resp->tag);
}
pfree(resp);

View File

@@ -94,4 +94,5 @@ select = [
"I", # isort
"W", # pycodestyle
"B", # bugbear
"UP032", # f-string
]

View File

@@ -64,14 +64,14 @@ def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> str:
Returns basepath for files with captured output.
"""
assert isinstance(cmd, list)
base = os.path.basename(cmd[0]) + "_{}".format(global_counter())
base = f"{os.path.basename(cmd[0])}_{global_counter()}"
basepath = os.path.join(capture_dir, base)
stdout_filename = basepath + ".stdout"
stderr_filename = basepath + ".stderr"
with open(stdout_filename, "w") as stdout_f:
with open(stderr_filename, "w") as stderr_f:
print('(capturing output to "{}.stdout")'.format(base))
print(f'(capturing output to "{base}.stdout")')
subprocess.run(cmd, **kwargs, stdout=stdout_f, stderr=stderr_f)
return basepath
@@ -82,11 +82,9 @@ class PgBin:
def __init__(self, log_dir: Path, pg_distrib_dir, pg_version):
self.log_dir = log_dir
self.pg_bin_path = os.path.join(str(pg_distrib_dir), "v{}".format(pg_version), "bin")
self.pg_bin_path = os.path.join(str(pg_distrib_dir), f"v{pg_version}", "bin")
self.env = os.environ.copy()
self.env["LD_LIBRARY_PATH"] = os.path.join(
str(pg_distrib_dir), "v{}".format(pg_version), "lib"
)
self.env["LD_LIBRARY_PATH"] = os.path.join(str(pg_distrib_dir), f"v{pg_version}", "lib")
def _fixpath(self, command: List[str]):
if "/" not in command[0]:
@@ -110,7 +108,7 @@ class PgBin:
"""
self._fixpath(command)
print('Running command "{}"'.format(" ".join(command)))
print(f'Running command "{" ".join(command)}"')
env = self._build_env(env)
subprocess.run(command, env=env, cwd=cwd, check=True)
@@ -128,7 +126,7 @@ class PgBin:
"""
self._fixpath(command)
print('Running command "{}"'.format(" ".join(command)))
print(f'Running command "{" ".join(command)}"')
env = self._build_env(env)
return subprocess_capture(
str(self.log_dir), command, env=env, cwd=cwd, check=True, **kwargs
@@ -300,7 +298,7 @@ class NeonPageserverHttpClient(requests.Session):
def lsn_to_hex(num: int) -> str:
"""Convert lsn from int to standard hex notation."""
return "{:X}/{:X}".format(num >> 32, num & 0xFFFFFFFF)
return f"{num >> 32:X}/{num & 0xFFFFFFFF:X}"
def lsn_from_hex(lsn_hex: str) -> int:
@@ -331,16 +329,12 @@ def wait_for_upload(
if current_lsn >= lsn:
return
print(
"waiting for remote_consistent_lsn to reach {}, now {}, iteration {}".format(
lsn_to_hex(lsn), lsn_to_hex(current_lsn), i + 1
)
f"waiting for remote_consistent_lsn to reach {lsn_to_hex(lsn)}, now {lsn_to_hex(current_lsn)}, iteration {i + 1}"
)
time.sleep(1)
raise Exception(
"timed out while waiting for remote_consistent_lsn to reach {}, was {}".format(
lsn_to_hex(lsn), lsn_to_hex(current_lsn)
)
f"timed out while waiting for remote_consistent_lsn to reach {lsn_to_hex(lsn)}, was {lsn_to_hex(current_lsn)}"
)

View File

@@ -482,20 +482,18 @@ def pytest_terminal_summary(
terminalreporter.section("Benchmark results", "-")
is_header_printed = True
terminalreporter.write(
"{}.{}: ".format(test_report.head_line, recorded_property["name"])
)
terminalreporter.write(f"{test_report.head_line}.{recorded_property['name']}: ")
unit = recorded_property["unit"]
value = recorded_property["value"]
if unit == "MB":
terminalreporter.write("{0:,.0f}".format(value), green=True)
terminalreporter.write(f"{value:,.0f}", green=True)
elif unit in ("s", "ms") and isinstance(value, float):
terminalreporter.write("{0:,.3f}".format(value), green=True)
terminalreporter.write(f"{value:,.3f}", green=True)
elif isinstance(value, float):
terminalreporter.write("{0:,.4f}".format(value), green=True)
terminalreporter.write(f"{value:,.4f}", green=True)
else:
terminalreporter.write(str(value), green=True)
terminalreporter.line(" {}".format(unit))
terminalreporter.line(f" {unit}")
result_entry.append(recorded_property)

View File

@@ -3605,7 +3605,7 @@ class Safekeeper:
return self
def stop(self, immediate: bool = False) -> "Safekeeper":
log.info("Stopping safekeeper {}".format(self.id))
log.info(f"Stopping safekeeper {self.id}")
self.env.neon_cli.safekeeper_stop(self.id, immediate)
self.running = False
return self
@@ -4037,13 +4037,13 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
for f in mismatch:
f1 = os.path.join(endpoint.pgdata_dir, f)
f2 = os.path.join(restored_dir_path, f)
stdout_filename = "{}.filediff".format(f2)
stdout_filename = f"{f2}.filediff"
with open(stdout_filename, "w") as stdout_f:
subprocess.run("xxd -b {} > {}.hex ".format(f1, f1), shell=True)
subprocess.run("xxd -b {} > {}.hex ".format(f2, f2), shell=True)
subprocess.run(f"xxd -b {f1} > {f1}.hex ", shell=True)
subprocess.run(f"xxd -b {f2} > {f2}.hex ", shell=True)
cmd = "diff {}.hex {}.hex".format(f1, f2)
cmd = f"diff {f1}.hex {f2}.hex"
subprocess.run([cmd], stdout=stdout_f, shell=True)
assert (mismatch, error) == ([], [])

View File

@@ -204,13 +204,11 @@ def wait_for_last_record_lsn(
return current_lsn
if i % 10 == 0:
log.info(
"{}/{} waiting for last_record_lsn to reach {}, now {}, iteration {}".format(
tenant, timeline, lsn, current_lsn, i + 1
)
f"{tenant}/{timeline} waiting for last_record_lsn to reach {lsn}, now {current_lsn}, iteration {i + 1}"
)
time.sleep(0.1)
raise Exception(
"timed out while waiting for last_record_lsn to reach {}, was {}".format(lsn, current_lsn)
f"timed out while waiting for last_record_lsn to reach {lsn}, was {current_lsn}"
)

View File

@@ -125,19 +125,19 @@ async def run_update_loop_worker(ep: Endpoint, n_txns: int, idx: int):
await conn.execute(f"ALTER TABLE {table} SET (autovacuum_enabled = false)")
await conn.execute(f"INSERT INTO {table} VALUES (1, 0)")
await conn.execute(
f"""
CREATE PROCEDURE updating{table}() as
$$
DECLARE
i integer;
BEGIN
FOR i IN 1..{n_txns} LOOP
UPDATE {table} SET x = x + 1 WHERE pk=1;
COMMIT;
END LOOP;
END
$$ LANGUAGE plpgsql
"""
CREATE PROCEDURE updating{0}() as
$$
DECLARE
i integer;
BEGIN
FOR i IN 1..{1} LOOP
UPDATE {0} SET x = x + 1 WHERE pk=1;
COMMIT;
END LOOP;
END
$$ LANGUAGE plpgsql
""".format(table, n_txns)
)
await conn.execute("SET statement_timeout=0")
await conn.execute(f"call updating{table}()")

View File

@@ -78,7 +78,7 @@ def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int)
p = random.randint(0, i)
timer = timeit.default_timer()
env.neon_cli.create_branch("b{}".format(i + 1), "b{}".format(p), tenant_id=tenant)
env.neon_cli.create_branch(f"b{i + 1}", f"b{p}", tenant_id=tenant)
dur = timeit.default_timer() - timer
log.info(f"Creating branch b{i+1} took {dur}s")

View File

@@ -1,79 +0,0 @@
import concurrent.futures
import threading
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
)
from fixtures.types import TenantId, TimelineId
def test_sharding_split_big_tenant(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"""
Check that splitting works as expected for a tenant with a reasonable amount of data, larger
than we use in a typical test.
"""
neon_env_builder.num_pageservers = 4
env = neon_env_builder.init_configs()
neon_env_builder.start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.neon_cli.create_tenant(
tenant_id, timeline_id, shard_count=1, placement_policy='{"Attached":1}'
)
# TODO: a large scale/size
expect_size = 100e6
scale = 500
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
with env.endpoints.create_start(
"main",
tenant_id=tenant_id,
) as ep:
options = "-cstatement_timeout=0 " + ep.default_options.get("options", "")
connstr = ep.connstr(password=None, options=options)
password = ep.default_options.get("password", None)
environ = {}
if password is not None:
environ["PGPASSWORD"] = password
args = ["pgbench", f"-s{scale}", "-i", "-I", "dtGvp", connstr]
# Write a lot of data into the tenant
pg_bin.run(args, env=environ)
# Confirm that we have created a physical size as large as expected
timeline_info = env.storage_controller.pageserver_api().timeline_detail(
tenant_id, timeline_id
)
log.info(f"Timeline after init: {timeline_info}")
assert timeline_info["current_physical_size"] > expect_size
background_job_duration = 30
background_stop = threading.Event()
def background_load():
while not background_stop.is_set():
args = [
"pgbench",
"-N",
"-c4",
f"-T{background_job_duration}",
"-P2",
"--progress-timestamp",
connstr,
]
pg_bin.run(args, env=environ)
bg_fut = executor.submit(background_load)
# Do a split while the endpoint is alive
env.storage_controller.tenant_shard_split(tenant_id, shard_count=4)
# Pump the scheduler to do all the changes it would do in the background
# after a shard split.
env.storage_controller.reconcile_until_idle(timeout_secs=300)
background_stop.set()
bg_fut.result(timeout=background_job_duration * 2)

View File

@@ -1,109 +0,0 @@
import concurrent.futures
import random
import time
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pg_version import PgVersion
from fixtures.types import TenantId, TenantShardId, TimelineId
def test_sharding_service_many_tenants(
neon_env_builder: NeonEnvBuilder,
):
"""
Check that we cope well with a not-totally-trivial number of tenants.
This is checking for:
- Obvious concurrency bugs from issuing many tenant creations/modifications
concurrently.
- Obvious scaling bugs like O(N^2) scaling that would be so slow that even
a basic test starts failing from slowness.
This is _not_ a comprehensive scale test: just a basic sanity check that
we don't fall over for a thousand shards.
"""
neon_env_builder.num_pageservers = 5
env = neon_env_builder.init_start()
# Total tenants
tenant_count = 2000
# Shards per tenant
shard_count = 2
stripe_size = 1024
tenants = set(TenantId.generate() for _i in range(0, tenant_count))
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
# We use a fixed seed to make the test reproducible: we want a randomly
# chosen order, but not to change the order every time we run the test.
rng = random.Random(1234)
# We will create tenants directly via API, not via neon_local, to avoid any false
# serialization of operations in neon_local (it e.g. loads/saves a config file on each call)
with concurrent.futures.ThreadPoolExecutor() as executor:
futs = []
for tenant_id in tenants:
f = executor.submit(
env.storage_controller.tenant_create, tenant_id, shard_count, stripe_size
)
futs.append(f)
# Wait for creations to finish
for f in futs:
f.result()
# Generate a mixture of operations and dispatch them all concurrently
futs = []
for tenant_id in tenants:
op = rng.choice([0, 1, 2])
if op == 0:
# A fan-out write operation to all shards in a tenant (timeline creation)
f = executor.submit(
virtual_ps_http.timeline_create,
PgVersion.NOT_SET,
tenant_id,
TimelineId.generate(),
)
elif op == 1:
# A reconciler operation: migrate a shard.
shard_number = rng.randint(0, shard_count - 1)
tenant_shard_id = TenantShardId(tenant_id, shard_number, shard_count)
dest_ps_id = rng.choice([ps.id for ps in env.pageservers])
f = executor.submit(
env.storage_controller.tenant_shard_migrate, tenant_shard_id, dest_ps_id
)
elif op == 2:
# A passthrough read to shard zero
f = executor.submit(virtual_ps_http.tenant_status, tenant_id)
futs.append(f)
# Wait for mixed ops to finish
for f in futs:
f.result()
# Rolling node failures: this is a small number of requests, but results in a large
# number of scheduler calls and reconcile tasks.
for pageserver in env.pageservers:
env.storage_controller.node_configure(pageserver.id, {"availability": "Offline"})
# The sleeps are just to make sure we aren't optimizing-away any re-scheduling operations
# from a brief flap in node state.
time.sleep(1)
env.storage_controller.node_configure(pageserver.id, {"availability": "Active"})
time.sleep(1)
# Restart the storage controller
env.storage_controller.stop()
env.storage_controller.start()
# Restart pageservers: this exercises the /re-attach API
for pageserver in env.pageservers:
pageserver.stop()
pageserver.start()

View File

@@ -189,6 +189,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
},
"trace_read_requests": True,
"walreceiver_connect_timeout": "13m",
"image_layer_creation_check_threshold": 1,
}
ps_http = env.pageserver.http_client()

View File

@@ -84,11 +84,11 @@ def test_branching_with_pgbench(
threads = []
if ty == "cascade":
env.neon_cli.create_branch("b{}".format(i + 1), "b{}".format(i), tenant_id=tenant)
env.neon_cli.create_branch(f"b{i + 1}", f"b{i}", tenant_id=tenant)
else:
env.neon_cli.create_branch("b{}".format(i + 1), "b0", tenant_id=tenant)
env.neon_cli.create_branch(f"b{i + 1}", "b0", tenant_id=tenant)
endpoints.append(env.endpoints.create_start("b{}".format(i + 1), tenant_id=tenant))
endpoints.append(env.endpoints.create_start(f"b{i + 1}", tenant_id=tenant))
threads.append(
threading.Thread(target=run_pgbench, args=(endpoints[-1].connstr(),), daemon=True)

View File

@@ -74,8 +74,8 @@ def test_large_schema(neon_env_builder: NeonEnvBuilder):
cur.execute("select * from pg_depend order by refclassid, refobjid, refobjsubid")
# Check layer file sizes
timeline_path = "{}/tenants/{}/timelines/{}/".format(
env.pageserver.workdir, env.initial_tenant, env.initial_timeline
timeline_path = (
f"{env.pageserver.workdir}/tenants/{env.initial_tenant}/timelines/{env.initial_timeline}/"
)
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):

View File

@@ -57,9 +57,7 @@ def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg):
time.sleep(10)
# Check layer file sizes
timeline_path = "{}/tenants/{}/timelines/{}/".format(
env.pageserver.workdir, env.initial_tenant, timeline
)
timeline_path = f"{env.pageserver.workdir}/tenants/{env.initial_tenant}/timelines/{timeline}/"
log.info(f"Check {timeline_path}")
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):

View File

@@ -165,6 +165,7 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
"compaction_threshold": "3",
# "image_creation_threshold": set at runtime
"compaction_target_size": f"{128 * (1024**2)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
"image_layer_creation_check_threshold": "0", # always check if a new image layer can be created
}
def tenant_update_config(changes):

View File

@@ -53,6 +53,7 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
"checkpoint_timeout": "24h", # something we won't reach
"checkpoint_distance": f"{50 * (1024**2)}", # something we won't reach, we checkpoint manually
"image_creation_threshold": "100", # we want to control when image is created
"image_layer_creation_check_threshold": "0",
"compaction_threshold": f"{l0_l1_threshold}",
"compaction_target_size": f"{128 * (1024**3)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
}

View File

@@ -568,6 +568,8 @@ def test_compaction_downloads_on_demand_with_image_creation(neon_env_builder: Ne
"image_creation_threshold": 100,
# repartitioning parameter, unused
"compaction_target_size": 128 * 1024**2,
# Always check if a new image layer can be created
"image_layer_creation_check_threshold": 0,
# pitr_interval and gc_horizon are not interesting because we dont run gc
}
@@ -632,7 +634,8 @@ def test_compaction_downloads_on_demand_with_image_creation(neon_env_builder: Ne
# threshold to expose image creation to downloading all of the needed
# layers -- threshold of 2 would sound more reasonable, but keeping it as 1
# to be less flaky
env.neon_cli.config_tenant(tenant_id, {"image_creation_threshold": "1"})
conf["image_creation_threshold"] = "1"
env.neon_cli.config_tenant(tenant_id, {k: str(v) for k, v in conf.items()})
pageserver_http.timeline_compact(tenant_id, timeline_id)
layers = pageserver_http.layer_map_info(tenant_id, timeline_id)

View File

@@ -9,7 +9,6 @@ of the pageserver are:
- Updates to remote_consistent_lsn may only be made visible after validating generation
"""
import enum
import re
import time
@@ -53,6 +52,7 @@ TENANT_CONF = {
"compaction_period": "0s",
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
}

View File

@@ -22,7 +22,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
with closing(endpoint.connect()) as con:
with con.cursor() as c:
for e in extensions:
c.execute("create extension if not exists {};".format(e))
c.execute(f"create extension if not exists {e};")
c.execute("create table foo (c int) with (autovacuum_enabled = false)")
c.execute("insert into foo values (1)")
@@ -42,14 +42,12 @@ def test_read_validation(neon_simple_env: NeonEnv):
log.info("Test table is populated, validating buffer cache")
cache_entries = query_scalar(
c, "select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode)
c, f"select count(*) from pg_buffercache where relfilenode = {relfilenode}"
)
assert cache_entries > 0, "No buffers cached for the test relation"
c.execute(
"select reltablespace, reldatabase, relfilenode from pg_buffercache where relfilenode = {}".format(
relfilenode
)
f"select reltablespace, reldatabase, relfilenode from pg_buffercache where relfilenode = {relfilenode}"
)
reln = c.fetchone()
assert reln is not None
@@ -59,22 +57,20 @@ def test_read_validation(neon_simple_env: NeonEnv):
c.execute("select clear_buffer_cache()")
cache_entries = query_scalar(
c, "select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode)
c, f"select count(*) from pg_buffercache where relfilenode = {relfilenode}"
)
assert cache_entries == 0, "Failed to clear buffer cache"
log.info("Cache is clear, reading stale page version")
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '{}'))".format(
first[0]
)
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '{first[0]}'))"
)
direct_first = c.fetchone()
assert first == direct_first, "Failed fetch page at historic lsn"
cache_entries = query_scalar(
c, "select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode)
c, f"select count(*) from pg_buffercache where relfilenode = {relfilenode}"
)
assert cache_entries == 0, "relation buffers detected after invalidation"
@@ -87,7 +83,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
assert second == direct_latest, "Failed fetch page at latest lsn"
cache_entries = query_scalar(
c, "select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode)
c, f"select count(*) from pg_buffercache where relfilenode = {relfilenode}"
)
assert cache_entries == 0, "relation buffers detected after invalidation"
@@ -96,9 +92,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
)
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn( {}, {}, {}, 0, 0, '{}' ))".format(
reln[0], reln[1], reln[2], first[0]
)
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, '{first[0]}'))"
)
direct_first = c.fetchone()
assert first == direct_first, "Failed fetch page at historic lsn using oid"
@@ -108,9 +102,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
)
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn( {}, {}, {}, 0, 0, NULL ))".format(
reln[0], reln[1], reln[2]
)
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, NULL))"
)
direct_latest = c.fetchone()
assert second == direct_latest, "Failed fetch page at latest lsn"
@@ -122,9 +114,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
)
c.execute(
"select lsn, lower, upper from page_header(get_raw_page_at_lsn( {}, {}, {}, 0, 0, '{}' ))".format(
reln[0], reln[1], reln[2], first[0]
)
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, '{first[0]}'))"
)
direct_first = c.fetchone()
assert first == direct_first, "Failed fetch page at historic lsn using oid"
@@ -134,7 +124,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
c.execute("select * from page_header(get_raw_page('foo', 'main', 0));")
raise AssertionError("query should have failed")
except UndefinedTable as e:
log.info("Caught an expected failure: {}".format(e))
log.info(f"Caught an expected failure: {e}")
def test_read_validation_neg(neon_simple_env: NeonEnv):
@@ -148,7 +138,7 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
with closing(endpoint.connect()) as con:
with con.cursor() as c:
for e in extensions:
c.execute("create extension if not exists {};".format(e))
c.execute(f"create extension if not exists {e};")
log.info("read a page of a missing relation")
try:
@@ -157,7 +147,7 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
)
raise AssertionError("query should have failed")
except UndefinedTable as e:
log.info("Caught an expected failure: {}".format(e))
log.info(f"Caught an expected failure: {e}")
c.execute("create table foo (c int) with (autovacuum_enabled = false)")
c.execute("insert into foo values (1)")
@@ -169,7 +159,7 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
)
raise AssertionError("query should have failed")
except IoError as e:
log.info("Caught an expected failure: {}".format(e))
log.info(f"Caught an expected failure: {e}")
log.info("Pass NULL as an input")
expected = (None, None, None)

View File

@@ -245,6 +245,7 @@ def test_remote_storage_upload_queue_retries(
"compaction_period": "0s",
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
}
)

View File

@@ -1,4 +1,3 @@
import json
import os
import time
from collections import defaultdict
@@ -254,6 +253,10 @@ def test_sharding_split_smoke(
# The old parent shards should no longer exist on disk
assert not shards_on_disk(old_shard_ids)
# Enough background reconciliations should result in the shards being properly distributed.
# Run this before the workload, because its LSN-waiting code presumes stable locations.
env.storage_controller.reconcile_until_idle()
workload.validate()
workload.churn_rows(256)
@@ -267,14 +270,6 @@ def test_sharding_split_smoke(
pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None)
workload.validate()
# Enough background reconciliations should result in the shards being properly distributed
env.storage_controller.reconcile_until_idle()
# We have 8 shards and 16 nodes
# Initially I expect 4 nodes to have 2 attached locations each, and another 8 nodes to have
# 1 secondary location each
# 2 2 2 2 1 1 1 1 1 1 1 1 0 0 0 0
# Assert on how many reconciles happened during the process. This is something of an
# implementation detail, but it is useful to detect any bugs that might generate spurious
# extra reconcile iterations.
@@ -333,6 +328,31 @@ def test_sharding_split_smoke(
assert sum(total.values()) == split_shard_count * 2
check_effective_tenant_config()
# More specific check: that we are fully balanced. This is deterministic because
# the order in which we consider shards for optimization is deterministic, and the
# order of preference of nodes is also deterministic (lower node IDs win).
log.info(f"total: {total}")
assert total == {
1: 1,
2: 1,
3: 1,
4: 1,
5: 1,
6: 1,
7: 1,
8: 1,
9: 1,
10: 1,
11: 1,
12: 1,
13: 1,
14: 1,
15: 1,
16: 1,
}
log.info(f"attached: {attached}")
assert attached == {1: 1, 2: 1, 3: 1, 5: 1, 6: 1, 7: 1, 9: 1, 11: 1}
# Ensure post-split pageserver locations survive a restart (i.e. the child shards
# correctly wrote config to disk, and the storage controller responds correctly
# to /re-attach)
@@ -391,6 +411,7 @@ def test_sharding_split_stripe_size(
env.storage_controller.tenant_shard_split(
tenant_id, shard_count=2, shard_stripe_size=new_stripe_size
)
env.storage_controller.reconcile_until_idle()
# Check that we ended up with the stripe size that we expected, both on the pageserver
# and in the notifications to compute
@@ -859,6 +880,7 @@ def test_sharding_split_failures(
# Having failed+rolled back, we should be able to split again
# No failures this time; it will succeed
env.storage_controller.tenant_shard_split(tenant_id, shard_count=split_shard_count)
env.storage_controller.reconcile_until_idle(timeout_secs=30)
workload.churn_rows(10)
workload.validate()
@@ -912,6 +934,10 @@ def test_sharding_split_failures(
finish_split()
assert_split_done()
# Having completed the split, pump the background reconciles to ensure that
# the scheduler reaches an idle state
env.storage_controller.reconcile_until_idle(timeout_secs=30)
env.storage_controller.consistency_check()
@@ -1040,82 +1066,3 @@ def test_sharding_backpressure(neon_env_builder: NeonEnvBuilder):
max_lsn = max(Lsn(info["last_record_lsn"]) for info in infos)
diff = max_lsn - min_lsn
assert diff < 2 * 1024 * 1024, f"LSN diff={diff}, expected diff < 2MB due to backpressure"
# Stripe sizes in number of pages.
TINY_STRIPES = 16
LARGE_STRIPES = 32768
@pytest.mark.parametrize("stripe_size", [TINY_STRIPES, LARGE_STRIPES])
def test_sharding_compaction(neon_env_builder: NeonEnvBuilder, stripe_size: int):
"""
Use small stripes, small layers, and small compaction thresholds to exercise how compaction
and image layer generation interacts with sharding.
"""
TENANT_CONF = {
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{128 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{128 * 1024}",
# no PITR horizon, we specify the horizon when we request on-demand GC
"pitr_interval": "0s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# create image layers eagerly: we want to exercise image layer creation in this test.
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": 0,
}
neon_env_builder.num_pageservers = 4
env = neon_env_builder.init_start(
initial_tenant_conf=TENANT_CONF,
initial_tenant_shard_count=4,
initial_tenant_shard_stripe_size=stripe_size,
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(64)
for _i in range(0, 10):
# Each of these does some writes then a checkpoint: because we set image_creation_threshold to 1,
# these should result in image layers each time we write some data into a shard, and also shards
# recieving less data hitting their "empty image layer" path (wherre they should skip writing the layer,
# rather than asserting)
workload.churn_rows(64)
# Assert that we got some image layers: this is important because this test's purpose is to exercise the sharding changes
# to Timeline::create_image_layers, so if we weren't creating any image layers we wouldn't be doing our job.
shard_has_image_layers = []
for shard in env.storage_controller.locate(tenant_id):
pageserver = env.get_pageserver(shard["node_id"])
shard_id = shard["shard_id"]
layer_map = pageserver.http_client().layer_map_info(shard_id, timeline_id)
image_layer_sizes = {}
for layer in layer_map.historic_layers:
if layer.kind == "Image":
image_layer_sizes[layer.layer_file_name] = layer.layer_file_size
# Pageserver should assert rather than emit an empty layer file, but double check here
assert layer.layer_file_size is not None
assert layer.layer_file_size > 0
shard_has_image_layers.append(len(image_layer_sizes) > 1)
log.info(f"Shard {shard_id} layer sizes: {json.dumps(image_layer_sizes, indent=2)}")
# TODO: once keyspace partitioning is updated, assert that layer sizes are as expected
# (see https://github.com/neondatabase/neon/issues/6774)
if stripe_size == TINY_STRIPES:
# Expect writes were scattered across all pageservers: they should all have compacted some image layers
assert all(shard_has_image_layers)
else:
# With large stripes, it is expected that most of our writes went to one pageserver, so we just require
# that at least one of them has some image layers.
assert any(shard_has_image_layers)
# Assert that everything is still readable
workload.validate()

View File

@@ -433,10 +433,13 @@ def test_sharding_service_compute_hook(
# Set up fake HTTP notify endpoint
notifications = []
handle_params = {"status": 200}
def handler(request: Request):
log.info(f"Notify request: {request}")
status = handle_params["status"]
log.info(f"Notify request[{status}]: {request}")
notifications.append(request.json)
return Response(status=200)
return Response(status=status)
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
@@ -504,6 +507,24 @@ def test_sharding_service_compute_hook(
wait_until(10, 1, received_split_notification)
# If the compute hook is unavailable, that should not block creating a tenant and
# creating a timeline. This simulates a control plane refusing to accept notifications
handle_params["status"] = 423
degraded_tenant_id = TenantId.generate()
degraded_timeline_id = TimelineId.generate()
env.storage_controller.tenant_create(degraded_tenant_id)
env.storage_controller.pageserver_api().timeline_create(
PgVersion.NOT_SET, degraded_tenant_id, degraded_timeline_id
)
# Ensure we hit the handler error path
env.storage_controller.allowed_errors.append(
".*Failed to notify compute of attached pageserver.*tenant busy.*"
)
env.storage_controller.allowed_errors.append(".*Reconcile error.*tenant busy.*")
assert notifications[-1] is not None
assert notifications[-1]["tenant_id"] == str(degraded_tenant_id)
env.storage_controller.consistency_check()

View File

@@ -1,188 +0,0 @@
import concurrent.futures
import random
from collections import defaultdict
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
)
from fixtures.types import TenantId, TenantShardId, TimelineId
from fixtures.utils import wait_until
from fixtures.workload import Workload
def get_node_shard_counts(env: NeonEnv, tenant_ids):
total: defaultdict[int, int] = defaultdict(int)
attached: defaultdict[int, int] = defaultdict(int)
for tid in tenant_ids:
for shard in env.storage_controller.tenant_describe(tid)["shards"]:
log.info(
f"{shard['tenant_shard_id']}: attached={shard['node_attached']}, secondary={shard['node_secondary']} "
)
for node in shard["node_secondary"]:
total[int(node)] += 1
attached[int(shard["node_attached"])] += 1
total[int(shard["node_attached"])] += 1
return total, attached
def test_storcon_rolling_failures(
neon_env_builder: NeonEnvBuilder,
compute_reconfigure_listener: ComputeReconfigure,
):
neon_env_builder.num_pageservers = 8
neon_env_builder.control_plane_compute_hook_api = (
compute_reconfigure_listener.control_plane_compute_hook_api
)
workloads: dict[TenantId, Workload] = {}
env = neon_env_builder.init_start()
for ps in env.pageservers:
# We will do unclean detaches
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
n_tenants = 32
tenants = [(env.initial_tenant, env.initial_timeline)]
for i in range(0, n_tenants - 1):
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
shard_count = [1, 2, 4][i % 3]
env.neon_cli.create_tenant(
tenant_id, timeline_id, shard_count=shard_count, placement_policy='{"Double":1}'
)
tenants.append((tenant_id, timeline_id))
# Background pain:
# - TODO: some fraction of pageserver API requests hang
# (this requires implementing wrap of location_conf calls with proper timeline/cancel)
# - TODO: continuous tenant/timeline creation/destruction over a different ID range than
# the ones we're using for availability checks.
rng = random.Random(0xDEADBEEF)
for tenant_id, timeline_id in tenants:
workload = Workload(env, tenant_id, timeline_id)
compute_reconfigure_listener.register_workload(workload)
workloads[tenant_id] = workload
def node_evacuated(node_id: int):
total, attached = get_node_shard_counts(env, [t[0] for t in tenants])
assert attached[node_id] == 0
def attachments_active():
for tid, _tlid in tenants:
for shard in env.storage_controller.locate(tid):
psid = shard["node_id"]
tsid = TenantShardId.parse(shard["shard_id"])
status = env.get_pageserver(psid).http_client().tenant_status(tenant_id=tsid)
assert status["state"]["slug"] == "Active"
log.info(f"Shard {tsid} active on node {psid}")
failpoints = ("api-503", "5%1000*return(1)")
failpoints_str = f"{failpoints[0]}={failpoints[1]}"
for ps in env.pageservers:
ps.http_client().configure_failpoints(failpoints)
def for_all_workloads(callback, timeout=60):
futs = []
with concurrent.futures.ThreadPoolExecutor() as pool:
for _tenant_id, workload in workloads.items():
futs.append(pool.submit(callback, workload))
for f in futs:
f.result(timeout=timeout)
def clean_fail_restore():
"""
Clean shutdown of a node: mark it offline in storage controller, wait for new attachment
locations to activate, then SIGTERM it.
- Endpoints should not fail any queries
- New attach locations should activate within bounded time.
"""
victim = rng.choice(env.pageservers)
env.storage_controller.node_configure(victim.id, {"availability": "Offline"})
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
wait_until(10, 1, attachments_active)
victim.stop(immediate=False)
traffic()
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
# Revert shards to attach at their original locations
# TODO
# env.storage_controller.balance_attached()
wait_until(10, 1, attachments_active)
def hard_fail_restore():
"""
Simulate an unexpected death of a pageserver node
"""
victim = rng.choice(env.pageservers)
victim.stop(immediate=True)
# TODO: once we implement heartbeats detecting node failures, remove this
# explicit marking offline and rely on storage controller to detect it itself.
env.storage_controller.node_configure(victim.id, {"availability": "Offline"})
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
wait_until(10, 1, attachments_active)
traffic()
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
# TODO
# env.storage_controller.balance_attached()
wait_until(10, 1, attachments_active)
def traffic():
"""
Check that all tenants are working for postgres clients
"""
def exercise_one(workload):
workload.churn_rows(100)
workload.validate()
for_all_workloads(exercise_one)
def init_one(workload):
workload.init()
workload.write_rows(100)
for_all_workloads(init_one, timeout=60)
for i in range(0, 20):
mode = rng.choice([0, 1, 2])
log.info(f"Iteration {i}, mode {mode}")
if mode == 0:
# Traffic interval: sometimes, instead of a failure, just let the clients
# write a load of data. This avoids chaos tests ending up with unrealistically
# small quantities of data in flight.
traffic()
elif mode == 1:
clean_fail_restore()
elif mode == 2:
hard_fail_restore()
# Fail and restart: hard-kill one node. Notify the storage controller that it is offline.
# Success criteria:
# - New attach locations should activate within bounded time
# - TODO: once we do heartbeating, we should not have to explicitly mark the node offline
# TODO: fail and remove: fail a node, and remove it from the cluster.
# Success criteria:
# - Endpoints should not fail any queries
# - New attach locations should activate within bounded time
# - New secondary locations should fill up with data within bounded time
# TODO: somehow need to wait for reconciles to complete before doing consistency check
# (or make the check wait).
# Do consistency check on every iteration, not just at the end: this makes it more obvious
# which change caused an issue.
env.storage_controller.consistency_check()

View File

@@ -103,9 +103,7 @@ def test_many_timelines(neon_env_builder: NeonEnvBuilder):
n_timelines = 3
branch_names = [
"test_safekeepers_many_timelines_{}".format(tlin) for tlin in range(n_timelines)
]
branch_names = [f"test_safekeepers_many_timelines_{tlin}" for tlin in range(n_timelines)]
# pageserver, safekeeper operate timelines via their ids (can be represented in hex as 'ad50847381e248feaac9876cc71ae418')
# that's not really human readable, so the branch names are introduced in Neon CLI.
# Neon CLI stores its branch <-> timeline mapping in its internals,
@@ -1136,13 +1134,13 @@ def cmp_sk_wal(sks: List[Safekeeper], tenant_id: TenantId, timeline_id: Timeline
for f in mismatch:
f1 = os.path.join(sk0.timeline_dir(tenant_id, timeline_id), f)
f2 = os.path.join(sk.timeline_dir(tenant_id, timeline_id), f)
stdout_filename = "{}.filediff".format(f2)
stdout_filename = f"{f2}.filediff"
with open(stdout_filename, "w") as stdout_f:
subprocess.run("xxd {} > {}.hex ".format(f1, f1), shell=True)
subprocess.run("xxd {} > {}.hex ".format(f2, f2), shell=True)
subprocess.run(f"xxd {f1} > {f1}.hex ", shell=True)
subprocess.run(f"xxd {f2} > {f2}.hex ", shell=True)
cmd = "diff {}.hex {}.hex".format(f1, f2)
cmd = f"diff {f1}.hex {f2}.hex"
subprocess.run([cmd], stdout=stdout_f, shell=True)
assert (mismatch, not_regular) == (

View File

@@ -76,20 +76,20 @@ class WorkerStats(object):
self.counters[worker_id] += 1
def check_progress(self):
log.debug("Workers progress: {}".format(self.counters))
log.debug(f"Workers progress: {self.counters}")
# every worker should finish at least one tx
assert all(cnt > 0 for cnt in self.counters)
progress = sum(self.counters)
log.info("All workers made {} transactions".format(progress))
log.info(f"All workers made {progress} transactions")
async def run_random_worker(
stats: WorkerStats, endpoint: Endpoint, worker_id, n_accounts, max_transfer
):
pg_conn = await endpoint.connect_async()
log.debug("Started worker {}".format(worker_id))
log.debug(f"Started worker {worker_id}")
while stats.running:
from_uid = random.randint(0, n_accounts - 1)
@@ -99,9 +99,9 @@ async def run_random_worker(
await bank_transfer(pg_conn, from_uid, to_uid, amount)
stats.inc_progress(worker_id)
log.debug("Executed transfer({}) {} => {}".format(amount, from_uid, to_uid))
log.debug(f"Executed transfer({amount}) {from_uid} => {to_uid}")
log.debug("Finished worker {}".format(worker_id))
log.debug(f"Finished worker {worker_id}")
await pg_conn.close()

View File

@@ -1,5 +1,5 @@
{
"postgres-v16": "3946b2e2ea71d07af092099cb5bcae76a69b90d6",
"postgres-v15": "e7651e79c0c27fbddc3c724f5b9553222c28e395",
"postgres-v14": "748643b4683e9fe3b105011a6ba8a687d032cd65"
"postgres-v15": "64b8c7bccc6b77e04795e2d4cf6ad82dc8d987ed",
"postgres-v14": "a7b4c66156bce00afa60e5592d4284ba9e40b4cf"
}

View File

@@ -187,6 +187,14 @@ files:
query: |
select sum(pg_database_size(datname)) as total from pg_database;
- metric_name: lfc_approximate_working_set_size
type: gauge
help: 'Approximate working set size in pages of 8192 bytes'
key_labels:
values: [approximate_working_set_size]
query: |
select neon.approximate_working_set_size(false) as approximate_working_set_size;
build: |
# Build cgroup-tools
#