mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-10 22:20:38 +00:00
Compare commits
13 Commits
jcsp/storc
...
RFC_merged
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
efaf2c6663 | ||
|
|
90a8ff55fa | ||
|
|
3b95e8072a | ||
|
|
8ee54ffd30 | ||
|
|
3ab9f56f5f | ||
|
|
7ddc7b4990 | ||
|
|
63213fc814 | ||
|
|
090123a429 | ||
|
|
39d1818ae9 | ||
|
|
90be79fcf5 | ||
|
|
c52b80b930 | ||
|
|
722f271f6e | ||
|
|
be1d8fc4f7 |
58
.github/workflows/benchmarking.yml
vendored
58
.github/workflows/benchmarking.yml
vendored
@@ -147,15 +147,16 @@ jobs:
|
||||
"neonvm-captest-new"
|
||||
],
|
||||
"db_size": [ "10gb" ],
|
||||
"include": [{ "platform": "neon-captest-freetier", "db_size": "3gb" },
|
||||
{ "platform": "neon-captest-new", "db_size": "50gb" },
|
||||
{ "platform": "neonvm-captest-freetier", "db_size": "3gb" },
|
||||
{ "platform": "neonvm-captest-new", "db_size": "50gb" }]
|
||||
"include": [{ "platform": "neon-captest-freetier", "db_size": "3gb" },
|
||||
{ "platform": "neon-captest-new", "db_size": "50gb" },
|
||||
{ "platform": "neonvm-captest-freetier", "db_size": "3gb" },
|
||||
{ "platform": "neonvm-captest-new", "db_size": "50gb" },
|
||||
{ "platform": "neonvm-captest-sharding-reuse", "db_size": "50gb" }]
|
||||
}'
|
||||
|
||||
if [ "$(date +%A)" = "Saturday" ]; then
|
||||
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres", "db_size": "10gb"},
|
||||
{ "platform": "rds-aurora", "db_size": "50gb"}]')
|
||||
{ "platform": "rds-aurora", "db_size": "50gb"}]')
|
||||
fi
|
||||
|
||||
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
|
||||
@@ -171,7 +172,7 @@ jobs:
|
||||
|
||||
if [ "$(date +%A)" = "Saturday" ] || [ ${RUN_AWS_RDS_AND_AURORA} = "true" ]; then
|
||||
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres" },
|
||||
{ "platform": "rds-aurora" }]')
|
||||
{ "platform": "rds-aurora" }]')
|
||||
fi
|
||||
|
||||
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
|
||||
@@ -190,7 +191,7 @@ jobs:
|
||||
|
||||
if [ "$(date +%A)" = "Saturday" ] || [ ${RUN_AWS_RDS_AND_AURORA} = "true" ]; then
|
||||
matrix=$(echo "$matrix" | jq '.include += [{ "platform": "rds-postgres", "scale": "10" },
|
||||
{ "platform": "rds-aurora", "scale": "10" }]')
|
||||
{ "platform": "rds-aurora", "scale": "10" }]')
|
||||
fi
|
||||
|
||||
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
|
||||
@@ -253,6 +254,9 @@ jobs:
|
||||
neon-captest-reuse)
|
||||
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR }}
|
||||
;;
|
||||
neonvm-captest-sharding-reuse)
|
||||
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_SHARDING_CONNSTR }}
|
||||
;;
|
||||
neon-captest-new | neon-captest-freetier | neonvm-captest-new | neonvm-captest-freetier)
|
||||
CONNSTR=${{ steps.create-neon-project.outputs.dsn }}
|
||||
;;
|
||||
@@ -270,11 +274,15 @@ jobs:
|
||||
|
||||
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
|
||||
|
||||
QUERY="SELECT version();"
|
||||
QUERIES=("SELECT version()")
|
||||
if [[ "${PLATFORM}" = "neon"* ]]; then
|
||||
QUERY="${QUERY} SHOW neon.tenant_id; SHOW neon.timeline_id;"
|
||||
QUERIES+=("SHOW neon.tenant_id")
|
||||
QUERIES+=("SHOW neon.timeline_id")
|
||||
fi
|
||||
psql ${CONNSTR} -c "${QUERY}"
|
||||
|
||||
for q in "${QUERIES[@]}"; do
|
||||
psql ${CONNSTR} -c "${q}"
|
||||
done
|
||||
|
||||
- name: Benchmark init
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
@@ -401,11 +409,15 @@ jobs:
|
||||
|
||||
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
|
||||
|
||||
QUERY="SELECT version();"
|
||||
QUERIES=("SELECT version()")
|
||||
if [[ "${PLATFORM}" = "neon"* ]]; then
|
||||
QUERY="${QUERY} SHOW neon.tenant_id; SHOW neon.timeline_id;"
|
||||
QUERIES+=("SHOW neon.tenant_id")
|
||||
QUERIES+=("SHOW neon.timeline_id")
|
||||
fi
|
||||
psql ${CONNSTR} -c "${QUERY}"
|
||||
|
||||
for q in "${QUERIES[@]}"; do
|
||||
psql ${CONNSTR} -c "${q}"
|
||||
done
|
||||
|
||||
- name: ClickBench benchmark
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
@@ -507,11 +519,15 @@ jobs:
|
||||
|
||||
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
|
||||
|
||||
QUERY="SELECT version();"
|
||||
QUERIES=("SELECT version()")
|
||||
if [[ "${PLATFORM}" = "neon"* ]]; then
|
||||
QUERY="${QUERY} SHOW neon.tenant_id; SHOW neon.timeline_id;"
|
||||
QUERIES+=("SHOW neon.tenant_id")
|
||||
QUERIES+=("SHOW neon.timeline_id")
|
||||
fi
|
||||
psql ${CONNSTR} -c "${QUERY}"
|
||||
|
||||
for q in "${QUERIES[@]}"; do
|
||||
psql ${CONNSTR} -c "${q}"
|
||||
done
|
||||
|
||||
- name: Run TPC-H benchmark
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
@@ -597,11 +613,15 @@ jobs:
|
||||
|
||||
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
|
||||
|
||||
QUERY="SELECT version();"
|
||||
QUERIES=("SELECT version()")
|
||||
if [[ "${PLATFORM}" = "neon"* ]]; then
|
||||
QUERY="${QUERY} SHOW neon.tenant_id; SHOW neon.timeline_id;"
|
||||
QUERIES+=("SHOW neon.tenant_id")
|
||||
QUERIES+=("SHOW neon.timeline_id")
|
||||
fi
|
||||
psql ${CONNSTR} -c "${QUERY}"
|
||||
|
||||
for q in "${QUERIES[@]}"; do
|
||||
psql ${CONNSTR} -c "${q}"
|
||||
done
|
||||
|
||||
- name: Run user examples
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
|
||||
3
.github/workflows/build_and_test.yml
vendored
3
.github/workflows/build_and_test.yml
vendored
@@ -1127,6 +1127,7 @@ jobs:
|
||||
-f deployProxy=false \
|
||||
-f deployStorage=true \
|
||||
-f deployStorageBroker=true \
|
||||
-f deployStorageController=true \
|
||||
-f branch=main \
|
||||
-f dockerTag=${{needs.tag.outputs.build-tag}} \
|
||||
-f deployPreprodRegion=true
|
||||
@@ -1136,6 +1137,7 @@ jobs:
|
||||
-f deployProxy=false \
|
||||
-f deployStorage=true \
|
||||
-f deployStorageBroker=true \
|
||||
-f deployStorageController=true \
|
||||
-f branch=main \
|
||||
-f dockerTag=${{needs.tag.outputs.build-tag}}
|
||||
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
|
||||
@@ -1144,6 +1146,7 @@ jobs:
|
||||
-f deployProxy=true \
|
||||
-f deployStorage=false \
|
||||
-f deployStorageBroker=false \
|
||||
-f deployStorageController=false \
|
||||
-f branch=main \
|
||||
-f dockerTag=${{needs.tag.outputs.build-tag}} \
|
||||
-f deployPreprodRegion=true
|
||||
|
||||
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -5934,9 +5934,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.36.0"
|
||||
version = "1.37.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931"
|
||||
checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"bytes",
|
||||
|
||||
@@ -944,6 +944,9 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
|
||||
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local
|
||||
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
|
||||
|
||||
# Create remote extension download directory
|
||||
RUN mkdir /usr/local/download_extensions && chown -R postgres:postgres /usr/local/download_extensions
|
||||
|
||||
# Install:
|
||||
# libreadline8 for psql
|
||||
# libicu67, locales for collations (including ICU and plpgsql_check)
|
||||
|
||||
@@ -1262,10 +1262,12 @@ LIMIT 100",
|
||||
.await
|
||||
.map_err(DownloadError::Other);
|
||||
|
||||
self.ext_download_progress
|
||||
.write()
|
||||
.expect("bad lock")
|
||||
.insert(ext_archive_name.to_string(), (download_start, true));
|
||||
if download_size.is_ok() {
|
||||
self.ext_download_progress
|
||||
.write()
|
||||
.expect("bad lock")
|
||||
.insert(ext_archive_name.to_string(), (download_start, true));
|
||||
}
|
||||
|
||||
download_size
|
||||
}
|
||||
|
||||
@@ -743,21 +743,24 @@ pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
|
||||
// which may happen in two cases:
|
||||
// - extension was just installed
|
||||
// - extension was already installed and is up to date
|
||||
// DISABLED due to compute node unpinning epic
|
||||
// let query = "ALTER EXTENSION neon UPDATE";
|
||||
// info!("update neon extension version with query: {}", query);
|
||||
// client.simple_query(query)?;
|
||||
let query = "ALTER EXTENSION neon UPDATE";
|
||||
info!("update neon extension version with query: {}", query);
|
||||
if let Err(e) = client.simple_query(query) {
|
||||
error!(
|
||||
"failed to upgrade neon extension during `handle_extension_neon`: {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_neon_extension_upgrade(_client: &mut Client) -> Result<()> {
|
||||
info!("handle neon extension upgrade (not really)");
|
||||
// DISABLED due to compute node unpinning epic
|
||||
// let query = "ALTER EXTENSION neon UPDATE";
|
||||
// info!("update neon extension version with query: {}", query);
|
||||
// client.simple_query(query)?;
|
||||
pub fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
|
||||
info!("handle neon extension upgrade");
|
||||
let query = "ALTER EXTENSION neon UPDATE";
|
||||
info!("update neon extension version with query: {}", query);
|
||||
client.simple_query(query)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -14,7 +14,6 @@ use utils::{
|
||||
|
||||
use crate::service::Config;
|
||||
|
||||
const BUSY_DELAY: Duration = Duration::from_secs(1);
|
||||
const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
|
||||
|
||||
pub(crate) const API_CONCURRENCY: usize = 32;
|
||||
@@ -280,11 +279,10 @@ impl ComputeHook {
|
||||
Err(NotifyError::SlowDown)
|
||||
}
|
||||
StatusCode::LOCKED => {
|
||||
// Delay our retry if busy: the usual fast exponential backoff in backoff::retry
|
||||
// is not appropriate
|
||||
tokio::time::timeout(BUSY_DELAY, cancel.cancelled())
|
||||
.await
|
||||
.ok();
|
||||
// We consider this fatal, because it's possible that the operation blocking the control one is
|
||||
// also the one that is waiting for this reconcile. We should let the reconciler calling
|
||||
// this hook fail, to give control plane a chance to un-lock.
|
||||
tracing::info!("Control plane reports tenant is locked, dropping out of notify");
|
||||
Err(NotifyError::Busy)
|
||||
}
|
||||
StatusCode::SERVICE_UNAVAILABLE
|
||||
@@ -306,7 +304,12 @@ impl ComputeHook {
|
||||
let client = reqwest::Client::new();
|
||||
backoff::retry(
|
||||
|| self.do_notify_iteration(&client, url, &reconfigure_request, cancel),
|
||||
|e| matches!(e, NotifyError::Fatal(_) | NotifyError::Unexpected(_)),
|
||||
|e| {
|
||||
matches!(
|
||||
e,
|
||||
NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
|
||||
)
|
||||
},
|
||||
3,
|
||||
10,
|
||||
"Send compute notification",
|
||||
|
||||
@@ -37,6 +37,9 @@ pub(crate) struct StorageControllerMetricGroup {
|
||||
pub(crate) storage_controller_reconcile_complete:
|
||||
measured::CounterVec<ReconcileCompleteLabelGroupSet>,
|
||||
|
||||
/// Count of how many times we make an optimization change to a tenant's scheduling
|
||||
pub(crate) storage_controller_schedule_optimization: measured::Counter,
|
||||
|
||||
/// HTTP request status counters for handled requests
|
||||
pub(crate) storage_controller_http_request_status:
|
||||
measured::CounterVec<HttpRequestStatusLabelGroupSet>,
|
||||
@@ -101,6 +104,7 @@ impl StorageControllerMetricGroup {
|
||||
status: StaticLabelSet::new(),
|
||||
},
|
||||
),
|
||||
storage_controller_schedule_optimization: measured::Counter::new(),
|
||||
storage_controller_http_request_status: measured::CounterVec::new(
|
||||
HttpRequestStatusLabelGroupSet {
|
||||
path: lasso::ThreadedRodeo::new(),
|
||||
|
||||
@@ -101,15 +101,6 @@ impl PageserverClient {
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
|
||||
measured_request!(
|
||||
"tenant_heatmap_upload",
|
||||
crate::metrics::Method::Post,
|
||||
&self.node_id_label,
|
||||
self.inner.tenant_heatmap_upload(tenant_id).await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn location_config(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
|
||||
@@ -487,6 +487,7 @@ impl Reconciler {
|
||||
while let Err(e) = self.compute_notify().await {
|
||||
match e {
|
||||
NotifyError::Fatal(_) => return Err(ReconcileError::Notify(e)),
|
||||
NotifyError::ShuttingDown => return Err(ReconcileError::Cancel),
|
||||
_ => {
|
||||
tracing::warn!(
|
||||
"Live migration blocked by compute notification error, retrying: {e}"
|
||||
|
||||
@@ -288,8 +288,15 @@ impl Scheduler {
|
||||
node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
|
||||
}
|
||||
|
||||
/// Hard Exclude: only consider nodes not in this list.
|
||||
/// Soft exclude: only use nodes in this list if no others are available.
|
||||
/// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
|
||||
/// are already in use by this shard -- we use this to avoid picking the same node
|
||||
/// as both attached and secondary location. This is a hard constraint: if we cannot
|
||||
/// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
|
||||
///
|
||||
/// context: we prefer to avoid using nodes identified in the context, according
|
||||
/// to their anti-affinity score. We use this to prefeer to avoid placing shards in
|
||||
/// the same tenant on the same node. This is a soft constraint: the context will never
|
||||
/// cause us to fail to schedule a shard.
|
||||
pub(crate) fn schedule_shard(
|
||||
&self,
|
||||
hard_exclude: &[NodeId],
|
||||
|
||||
@@ -3409,14 +3409,6 @@ impl Service {
|
||||
.join(",")
|
||||
);
|
||||
|
||||
// Optimization: publish heatmaps immediately, so that secondary locations can start warming up.
|
||||
for child in child_ids {
|
||||
if let Err(e) = client.tenant_heatmap_upload(*child).await {
|
||||
// Non-fatal, this is just an optimization
|
||||
tracing::warn!("Failed to upload child {child} heatmap: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
if &response.new_shards != child_ids {
|
||||
// This should never happen: the pageserver should agree with us on how shard splits work.
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
@@ -3972,9 +3964,6 @@ impl Service {
|
||||
/// Helper for methods that will try and call pageserver APIs for
|
||||
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
|
||||
/// is attached somewhere.
|
||||
///
|
||||
/// TODO: this doesn't actually ensure attached unless the PlacementPolicy is
|
||||
/// an attached policy. We should error out if it isn't.
|
||||
fn ensure_attached_schedule(
|
||||
&self,
|
||||
mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>,
|
||||
@@ -3984,10 +3973,26 @@ impl Service {
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
for (_tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
||||
for (tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
||||
shard.schedule(scheduler, &mut schedule_context)?;
|
||||
|
||||
// The shard's policies may not result in an attached location being scheduled: this
|
||||
// is an error because our caller needs it attached somewhere.
|
||||
if shard.intent.get_attached().is_none() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Tenant {tenant_id} not scheduled to be attached"
|
||||
));
|
||||
};
|
||||
|
||||
if shard.stably_attached().is_some() {
|
||||
// We do not require the shard to be totally up to date on reconciliation: we just require
|
||||
// that it has been attached on the intended node. Other dirty state such as unattached secondary
|
||||
// locations, or compute hook notifications can be ignored.
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
|
||||
tracing::info!("Waiting for shard {tenant_shard_id} to reconcile, in order to ensure it is attached");
|
||||
waiters.push(waiter);
|
||||
}
|
||||
}
|
||||
@@ -4113,6 +4118,18 @@ impl Service {
|
||||
break;
|
||||
}
|
||||
|
||||
match shard.get_scheduling_policy() {
|
||||
ShardSchedulingPolicy::Active => {
|
||||
// Ok to do optimization
|
||||
}
|
||||
ShardSchedulingPolicy::Essential
|
||||
| ShardSchedulingPolicy::Pause
|
||||
| ShardSchedulingPolicy::Stop => {
|
||||
// Policy prevents optimizing this shard.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Accumulate the schedule context for all the shards in a tenant: we must have
|
||||
// the total view of all shards before we can try to optimize any of them.
|
||||
schedule_context.avoid(&shard.intent.all_pageservers());
|
||||
@@ -4131,11 +4148,12 @@ impl Service {
|
||||
continue;
|
||||
}
|
||||
|
||||
if tenant_shards
|
||||
.iter()
|
||||
.any(|s| !matches!(s.splitting, SplitState::Idle))
|
||||
{
|
||||
// Never attempt to optimize a tenant that is currently being split
|
||||
if tenant_shards.iter().any(|s| {
|
||||
!matches!(s.splitting, SplitState::Idle)
|
||||
|| matches!(s.policy, PlacementPolicy::Detached)
|
||||
}) {
|
||||
// Never attempt to optimize a tenant that is currently being split, or
|
||||
// a tenant that is meant to be detached
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -723,6 +723,11 @@ impl TenantState {
|
||||
scheduler: &mut Scheduler,
|
||||
optimization: ScheduleOptimization,
|
||||
) {
|
||||
metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_schedule_optimization
|
||||
.inc();
|
||||
|
||||
match optimization {
|
||||
ScheduleOptimization::MigrateAttachment(MigrateAttachment {
|
||||
old_attached_node_id,
|
||||
@@ -1080,6 +1085,10 @@ impl TenantState {
|
||||
self.scheduling_policy = p;
|
||||
}
|
||||
|
||||
pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
|
||||
&self.scheduling_policy
|
||||
}
|
||||
|
||||
pub(crate) fn from_persistent(
|
||||
tsp: TenantShardPersistence,
|
||||
intent: IntentState,
|
||||
|
||||
@@ -389,6 +389,10 @@ impl PageServerNode {
|
||||
.remove("image_creation_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()?,
|
||||
image_layer_creation_check_threshold: settings
|
||||
.remove("image_layer_creation_check_threshold")
|
||||
.map(|x| x.parse::<u8>())
|
||||
.transpose()?,
|
||||
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
|
||||
walreceiver_connect_timeout: settings
|
||||
.remove("walreceiver_connect_timeout")
|
||||
@@ -501,6 +505,12 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'image_creation_threshold' as non zero integer")?,
|
||||
image_layer_creation_check_threshold: settings
|
||||
.remove("image_layer_creation_check_threshold")
|
||||
.map(|x| x.parse::<u8>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'image_creation_check_threshold' as integer")?,
|
||||
|
||||
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
|
||||
walreceiver_connect_timeout: settings
|
||||
.remove("walreceiver_connect_timeout")
|
||||
|
||||
54
docs/rfcs/033-merged-compute-image.md
Normal file
54
docs/rfcs/033-merged-compute-image.md
Normal file
@@ -0,0 +1,54 @@
|
||||
## Merged compute image
|
||||
https://github.com/neondatabase/neon/issues/6685
|
||||
|
||||
### Motivation:
|
||||
It's hard to manage compute pools for 3 Postgres versions.
|
||||
(we have a compute image for each version of Postgres (currently, it's 3 for neonVM and 3 for k8s pods; eventually, we will have only neonVMs)).
|
||||
We can try putting all Postgres versions into a single image, which should dramatically improve pool usage.
|
||||
|
||||
### TODO
|
||||
#### Compute code changes:
|
||||
|
||||
1. Create merged compute image https://github.com/neondatabase/neon/pull/6808
|
||||
2. Pass compute version in spec from control-plane
|
||||
3. Change path to the postgres in compute_ctl. Now it is not specified explicitly.
|
||||
`compute_ctl` has `pgbin` and `pgdata` arguments, now they are used only in tests.
|
||||
3. Make changes to custom_extension code - fix path handling.
|
||||
|
||||
#### Control-plane changes:
|
||||
1. Pass compute version in spec from control-plane
|
||||
2. Remove old logic of VM pools management
|
||||
|
||||
#### Prewarm changes:
|
||||
Currently, for pooled VMs, we prewarm postgres to improve cold start speed
|
||||
```
|
||||
// If this is a pooled VM, prewarm before starting HTTP server and becoming
|
||||
// available for binding. Prewarming helps Postgres start quicker later,
|
||||
// because QEMU will already have it's memory allocated from the host, and
|
||||
// the necessary binaries will already be cached.
|
||||
```
|
||||
|
||||
Prewarm = initdb + start postgres + rm pgdata
|
||||
|
||||
Q: How should we do prewarm, if we don't know in adwance, what version of postgres will be used?
|
||||
I see two options:
|
||||
- use versioned pgdata directories and run prewarm operations for all existing versions.
|
||||
- chose "default_version" for each pooled VM and run prewarm. Try to start compute in pooled VM with matching version, in case it doesn't exist, spin compute in any existing VM. Start will be slower, because it is not prewarmed.
|
||||
|
||||
#### Extensions support
|
||||
To support merged compute image (image, containing all supported versions of postgres),
|
||||
we need to offload extensions from the image. We can implement this using "custom extensions" mechanism.
|
||||
|
||||
Custom extensions changes:
|
||||
1. We need to move all extensions from main compute image file to the build-custom-extensions repo
|
||||
2. We need to generate spec for all public extensions and pass it to compute image
|
||||
Spec contains information about files in the extension and paths,
|
||||
and also content of the control file. Currently it is set manually per-user, for single users that use "rare" custom extensions. We need to improve spec passing.
|
||||
For public extensions, we can embed this spec into compute image: use artifact from build-custom-extension CI step and put it into compute image.
|
||||
|
||||
3. We need to test performance of the extension downloading and ensure that it doesn't affect cold starts (with proxy the speed should be fine).
|
||||
4. Note that in this task we are not trying to solve extension versioning issue and assume that all extensions are mapped to compute images 1-1 as they are now.
|
||||
|
||||
#### Test changes:
|
||||
- This is general functionality and will be covered by e2e tests.
|
||||
- We will need to add test for extensions, to ensure that they are available for every new compute version. Don't need to run extension regression tests here. Just ensure that `CREATE EXTENSION ext;` works.
|
||||
@@ -301,6 +301,7 @@ pub struct TenantConfig {
|
||||
pub heatmap_period: Option<String>,
|
||||
pub lazy_slru_download: Option<bool>,
|
||||
pub timeline_get_throttle: Option<ThrottleConfig>,
|
||||
pub image_layer_creation_check_threshold: Option<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
|
||||
@@ -271,17 +271,6 @@ impl Client {
|
||||
Ok((status, progress))
|
||||
}
|
||||
|
||||
pub async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
|
||||
let path = reqwest::Url::parse(&format!(
|
||||
"{}/v1/tenant/{}/heatmap_upload",
|
||||
self.mgmt_api_endpoint, tenant_id
|
||||
))
|
||||
.expect("Cannot build URL");
|
||||
|
||||
self.request(Method::POST, path, ()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn location_config(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
|
||||
@@ -3653,6 +3653,9 @@ pub(crate) mod harness {
|
||||
heatmap_period: Some(tenant_conf.heatmap_period),
|
||||
lazy_slru_download: Some(tenant_conf.lazy_slru_download),
|
||||
timeline_get_throttle: Some(tenant_conf.timeline_get_throttle),
|
||||
image_layer_creation_check_threshold: Some(
|
||||
tenant_conf.image_layer_creation_check_threshold,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,6 +57,9 @@ pub mod defaults {
|
||||
// throughputs up to 1GiB/s per timeline.
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 1024 * 1024 * 1024;
|
||||
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
|
||||
// By default ingest enough WAL for two new L0 layers before checking if new image
|
||||
// image layers should be created.
|
||||
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
}
|
||||
@@ -362,6 +365,10 @@ pub struct TenantConf {
|
||||
pub lazy_slru_download: bool,
|
||||
|
||||
pub timeline_get_throttle: pageserver_api::models::ThrottleConfig,
|
||||
|
||||
// How much WAL must be ingested before checking again whether a new image layer is required.
|
||||
// Expresed in multiples of checkpoint distance.
|
||||
pub image_layer_creation_check_threshold: u8,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -454,6 +461,9 @@ pub struct TenantConfOpt {
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub timeline_get_throttle: Option<pageserver_api::models::ThrottleConfig>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub image_layer_creation_check_threshold: Option<u8>,
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
@@ -508,6 +518,9 @@ impl TenantConfOpt {
|
||||
.timeline_get_throttle
|
||||
.clone()
|
||||
.unwrap_or(global_conf.timeline_get_throttle),
|
||||
image_layer_creation_check_threshold: self
|
||||
.image_layer_creation_check_threshold
|
||||
.unwrap_or(global_conf.image_layer_creation_check_threshold),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -548,6 +561,7 @@ impl Default for TenantConf {
|
||||
heatmap_period: Duration::ZERO,
|
||||
lazy_slru_download: false,
|
||||
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
|
||||
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -621,6 +635,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
heatmap_period: value.heatmap_period.map(humantime),
|
||||
lazy_slru_download: value.lazy_slru_download,
|
||||
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
|
||||
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -309,6 +309,8 @@ pub struct Timeline {
|
||||
/// Configuration: how often should the partitioning be recalculated.
|
||||
repartition_threshold: u64,
|
||||
|
||||
last_image_layer_creation_check_at: AtomicLsn,
|
||||
|
||||
/// Current logical size of the "datadir", at the last LSN.
|
||||
current_logical_size: LogicalSize,
|
||||
|
||||
@@ -1632,6 +1634,15 @@ impl Timeline {
|
||||
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
|
||||
}
|
||||
|
||||
fn get_image_layer_creation_check_threshold(&self) -> u8 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
|
||||
tenant_conf.image_layer_creation_check_threshold.unwrap_or(
|
||||
self.conf
|
||||
.default_tenant_conf
|
||||
.image_layer_creation_check_threshold,
|
||||
)
|
||||
}
|
||||
|
||||
pub(super) fn tenant_conf_updated(&self) {
|
||||
// NB: Most tenant conf options are read by background loops, so,
|
||||
// changes will automatically be picked up.
|
||||
@@ -1769,6 +1780,7 @@ impl Timeline {
|
||||
},
|
||||
partitioning: tokio::sync::Mutex::new((KeyPartitioning::new(), Lsn(0))),
|
||||
repartition_threshold: 0,
|
||||
last_image_layer_creation_check_at: AtomicLsn::new(0),
|
||||
|
||||
last_received_wal: Mutex::new(None),
|
||||
rel_size_cache: RwLock::new(HashMap::new()),
|
||||
@@ -1797,6 +1809,7 @@ impl Timeline {
|
||||
};
|
||||
result.repartition_threshold =
|
||||
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
|
||||
|
||||
result
|
||||
.metrics
|
||||
.last_record_gauge
|
||||
@@ -3501,6 +3514,24 @@ impl Timeline {
|
||||
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
|
||||
let last = self.last_image_layer_creation_check_at.load();
|
||||
if lsn != Lsn(0) {
|
||||
let distance = lsn
|
||||
.checked_sub(last)
|
||||
.expect("Attempt to compact with LSN going backwards");
|
||||
|
||||
let min_distance = self.get_image_layer_creation_check_threshold() as u64
|
||||
* self.get_checkpoint_distance();
|
||||
|
||||
// Skip the expensive delta layer counting below if we've not ingested
|
||||
// sufficient WAL since the last check.
|
||||
if distance.0 < min_distance {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
self.last_image_layer_creation_check_at.store(lsn);
|
||||
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
let guard = self.layers.read().await;
|
||||
|
||||
@@ -1688,7 +1688,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
break;
|
||||
|
||||
default:
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_exists", resp->tag);
|
||||
}
|
||||
pfree(resp);
|
||||
return exists;
|
||||
@@ -2224,7 +2224,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
((NeonErrorResponse *) resp)->message)));
|
||||
break;
|
||||
default:
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_read_at_lsn", resp->tag);
|
||||
}
|
||||
|
||||
/* buffer was used, clean up for later reuse */
|
||||
@@ -2497,7 +2497,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
break;
|
||||
|
||||
default:
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_nblocks", resp->tag);
|
||||
}
|
||||
update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks);
|
||||
|
||||
@@ -2552,7 +2552,7 @@ neon_dbsize(Oid dbNode)
|
||||
break;
|
||||
|
||||
default:
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_dbsize", resp->tag);
|
||||
}
|
||||
|
||||
neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes",
|
||||
@@ -2857,7 +2857,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
|
||||
break;
|
||||
|
||||
default:
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_read_slru_segment", resp->tag);
|
||||
}
|
||||
pfree(resp);
|
||||
|
||||
|
||||
@@ -94,4 +94,5 @@ select = [
|
||||
"I", # isort
|
||||
"W", # pycodestyle
|
||||
"B", # bugbear
|
||||
"UP032", # f-string
|
||||
]
|
||||
|
||||
@@ -64,14 +64,14 @@ def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> str:
|
||||
Returns basepath for files with captured output.
|
||||
"""
|
||||
assert isinstance(cmd, list)
|
||||
base = os.path.basename(cmd[0]) + "_{}".format(global_counter())
|
||||
base = f"{os.path.basename(cmd[0])}_{global_counter()}"
|
||||
basepath = os.path.join(capture_dir, base)
|
||||
stdout_filename = basepath + ".stdout"
|
||||
stderr_filename = basepath + ".stderr"
|
||||
|
||||
with open(stdout_filename, "w") as stdout_f:
|
||||
with open(stderr_filename, "w") as stderr_f:
|
||||
print('(capturing output to "{}.stdout")'.format(base))
|
||||
print(f'(capturing output to "{base}.stdout")')
|
||||
subprocess.run(cmd, **kwargs, stdout=stdout_f, stderr=stderr_f)
|
||||
|
||||
return basepath
|
||||
@@ -82,11 +82,9 @@ class PgBin:
|
||||
|
||||
def __init__(self, log_dir: Path, pg_distrib_dir, pg_version):
|
||||
self.log_dir = log_dir
|
||||
self.pg_bin_path = os.path.join(str(pg_distrib_dir), "v{}".format(pg_version), "bin")
|
||||
self.pg_bin_path = os.path.join(str(pg_distrib_dir), f"v{pg_version}", "bin")
|
||||
self.env = os.environ.copy()
|
||||
self.env["LD_LIBRARY_PATH"] = os.path.join(
|
||||
str(pg_distrib_dir), "v{}".format(pg_version), "lib"
|
||||
)
|
||||
self.env["LD_LIBRARY_PATH"] = os.path.join(str(pg_distrib_dir), f"v{pg_version}", "lib")
|
||||
|
||||
def _fixpath(self, command: List[str]):
|
||||
if "/" not in command[0]:
|
||||
@@ -110,7 +108,7 @@ class PgBin:
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
print('Running command "{}"'.format(" ".join(command)))
|
||||
print(f'Running command "{" ".join(command)}"')
|
||||
env = self._build_env(env)
|
||||
subprocess.run(command, env=env, cwd=cwd, check=True)
|
||||
|
||||
@@ -128,7 +126,7 @@ class PgBin:
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
print('Running command "{}"'.format(" ".join(command)))
|
||||
print(f'Running command "{" ".join(command)}"')
|
||||
env = self._build_env(env)
|
||||
return subprocess_capture(
|
||||
str(self.log_dir), command, env=env, cwd=cwd, check=True, **kwargs
|
||||
@@ -300,7 +298,7 @@ class NeonPageserverHttpClient(requests.Session):
|
||||
|
||||
def lsn_to_hex(num: int) -> str:
|
||||
"""Convert lsn from int to standard hex notation."""
|
||||
return "{:X}/{:X}".format(num >> 32, num & 0xFFFFFFFF)
|
||||
return f"{num >> 32:X}/{num & 0xFFFFFFFF:X}"
|
||||
|
||||
|
||||
def lsn_from_hex(lsn_hex: str) -> int:
|
||||
@@ -331,16 +329,12 @@ def wait_for_upload(
|
||||
if current_lsn >= lsn:
|
||||
return
|
||||
print(
|
||||
"waiting for remote_consistent_lsn to reach {}, now {}, iteration {}".format(
|
||||
lsn_to_hex(lsn), lsn_to_hex(current_lsn), i + 1
|
||||
)
|
||||
f"waiting for remote_consistent_lsn to reach {lsn_to_hex(lsn)}, now {lsn_to_hex(current_lsn)}, iteration {i + 1}"
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
raise Exception(
|
||||
"timed out while waiting for remote_consistent_lsn to reach {}, was {}".format(
|
||||
lsn_to_hex(lsn), lsn_to_hex(current_lsn)
|
||||
)
|
||||
f"timed out while waiting for remote_consistent_lsn to reach {lsn_to_hex(lsn)}, was {lsn_to_hex(current_lsn)}"
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -482,20 +482,18 @@ def pytest_terminal_summary(
|
||||
terminalreporter.section("Benchmark results", "-")
|
||||
is_header_printed = True
|
||||
|
||||
terminalreporter.write(
|
||||
"{}.{}: ".format(test_report.head_line, recorded_property["name"])
|
||||
)
|
||||
terminalreporter.write(f"{test_report.head_line}.{recorded_property['name']}: ")
|
||||
unit = recorded_property["unit"]
|
||||
value = recorded_property["value"]
|
||||
if unit == "MB":
|
||||
terminalreporter.write("{0:,.0f}".format(value), green=True)
|
||||
terminalreporter.write(f"{value:,.0f}", green=True)
|
||||
elif unit in ("s", "ms") and isinstance(value, float):
|
||||
terminalreporter.write("{0:,.3f}".format(value), green=True)
|
||||
terminalreporter.write(f"{value:,.3f}", green=True)
|
||||
elif isinstance(value, float):
|
||||
terminalreporter.write("{0:,.4f}".format(value), green=True)
|
||||
terminalreporter.write(f"{value:,.4f}", green=True)
|
||||
else:
|
||||
terminalreporter.write(str(value), green=True)
|
||||
terminalreporter.line(" {}".format(unit))
|
||||
terminalreporter.line(f" {unit}")
|
||||
|
||||
result_entry.append(recorded_property)
|
||||
|
||||
|
||||
@@ -3605,7 +3605,7 @@ class Safekeeper:
|
||||
return self
|
||||
|
||||
def stop(self, immediate: bool = False) -> "Safekeeper":
|
||||
log.info("Stopping safekeeper {}".format(self.id))
|
||||
log.info(f"Stopping safekeeper {self.id}")
|
||||
self.env.neon_cli.safekeeper_stop(self.id, immediate)
|
||||
self.running = False
|
||||
return self
|
||||
@@ -4037,13 +4037,13 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
|
||||
for f in mismatch:
|
||||
f1 = os.path.join(endpoint.pgdata_dir, f)
|
||||
f2 = os.path.join(restored_dir_path, f)
|
||||
stdout_filename = "{}.filediff".format(f2)
|
||||
stdout_filename = f"{f2}.filediff"
|
||||
|
||||
with open(stdout_filename, "w") as stdout_f:
|
||||
subprocess.run("xxd -b {} > {}.hex ".format(f1, f1), shell=True)
|
||||
subprocess.run("xxd -b {} > {}.hex ".format(f2, f2), shell=True)
|
||||
subprocess.run(f"xxd -b {f1} > {f1}.hex ", shell=True)
|
||||
subprocess.run(f"xxd -b {f2} > {f2}.hex ", shell=True)
|
||||
|
||||
cmd = "diff {}.hex {}.hex".format(f1, f2)
|
||||
cmd = f"diff {f1}.hex {f2}.hex"
|
||||
subprocess.run([cmd], stdout=stdout_f, shell=True)
|
||||
|
||||
assert (mismatch, error) == ([], [])
|
||||
|
||||
@@ -204,13 +204,11 @@ def wait_for_last_record_lsn(
|
||||
return current_lsn
|
||||
if i % 10 == 0:
|
||||
log.info(
|
||||
"{}/{} waiting for last_record_lsn to reach {}, now {}, iteration {}".format(
|
||||
tenant, timeline, lsn, current_lsn, i + 1
|
||||
)
|
||||
f"{tenant}/{timeline} waiting for last_record_lsn to reach {lsn}, now {current_lsn}, iteration {i + 1}"
|
||||
)
|
||||
time.sleep(0.1)
|
||||
raise Exception(
|
||||
"timed out while waiting for last_record_lsn to reach {}, was {}".format(lsn, current_lsn)
|
||||
f"timed out while waiting for last_record_lsn to reach {lsn}, was {current_lsn}"
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -125,19 +125,19 @@ async def run_update_loop_worker(ep: Endpoint, n_txns: int, idx: int):
|
||||
await conn.execute(f"ALTER TABLE {table} SET (autovacuum_enabled = false)")
|
||||
await conn.execute(f"INSERT INTO {table} VALUES (1, 0)")
|
||||
await conn.execute(
|
||||
f"""
|
||||
CREATE PROCEDURE updating{table}() as
|
||||
$$
|
||||
DECLARE
|
||||
i integer;
|
||||
BEGIN
|
||||
FOR i IN 1..{n_txns} LOOP
|
||||
UPDATE {table} SET x = x + 1 WHERE pk=1;
|
||||
COMMIT;
|
||||
END LOOP;
|
||||
END
|
||||
$$ LANGUAGE plpgsql
|
||||
"""
|
||||
CREATE PROCEDURE updating{0}() as
|
||||
$$
|
||||
DECLARE
|
||||
i integer;
|
||||
BEGIN
|
||||
FOR i IN 1..{1} LOOP
|
||||
UPDATE {0} SET x = x + 1 WHERE pk=1;
|
||||
COMMIT;
|
||||
END LOOP;
|
||||
END
|
||||
$$ LANGUAGE plpgsql
|
||||
""".format(table, n_txns)
|
||||
)
|
||||
await conn.execute("SET statement_timeout=0")
|
||||
await conn.execute(f"call updating{table}()")
|
||||
|
||||
@@ -78,7 +78,7 @@ def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int)
|
||||
p = random.randint(0, i)
|
||||
|
||||
timer = timeit.default_timer()
|
||||
env.neon_cli.create_branch("b{}".format(i + 1), "b{}".format(p), tenant_id=tenant)
|
||||
env.neon_cli.create_branch(f"b{i + 1}", f"b{p}", tenant_id=tenant)
|
||||
dur = timeit.default_timer() - timer
|
||||
|
||||
log.info(f"Creating branch b{i+1} took {dur}s")
|
||||
|
||||
@@ -1,79 +0,0 @@
|
||||
import concurrent.futures
|
||||
import threading
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
)
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
|
||||
def test_sharding_split_big_tenant(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
"""
|
||||
Check that splitting works as expected for a tenant with a reasonable amount of data, larger
|
||||
than we use in a typical test.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 4
|
||||
env = neon_env_builder.init_configs()
|
||||
neon_env_builder.start()
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id, timeline_id, shard_count=1, placement_policy='{"Attached":1}'
|
||||
)
|
||||
|
||||
# TODO: a large scale/size
|
||||
expect_size = 100e6
|
||||
scale = 500
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||||
with env.endpoints.create_start(
|
||||
"main",
|
||||
tenant_id=tenant_id,
|
||||
) as ep:
|
||||
options = "-cstatement_timeout=0 " + ep.default_options.get("options", "")
|
||||
connstr = ep.connstr(password=None, options=options)
|
||||
password = ep.default_options.get("password", None)
|
||||
environ = {}
|
||||
if password is not None:
|
||||
environ["PGPASSWORD"] = password
|
||||
args = ["pgbench", f"-s{scale}", "-i", "-I", "dtGvp", connstr]
|
||||
|
||||
# Write a lot of data into the tenant
|
||||
pg_bin.run(args, env=environ)
|
||||
|
||||
# Confirm that we have created a physical size as large as expected
|
||||
timeline_info = env.storage_controller.pageserver_api().timeline_detail(
|
||||
tenant_id, timeline_id
|
||||
)
|
||||
log.info(f"Timeline after init: {timeline_info}")
|
||||
assert timeline_info["current_physical_size"] > expect_size
|
||||
|
||||
background_job_duration = 30
|
||||
background_stop = threading.Event()
|
||||
|
||||
def background_load():
|
||||
while not background_stop.is_set():
|
||||
args = [
|
||||
"pgbench",
|
||||
"-N",
|
||||
"-c4",
|
||||
f"-T{background_job_duration}",
|
||||
"-P2",
|
||||
"--progress-timestamp",
|
||||
connstr,
|
||||
]
|
||||
pg_bin.run(args, env=environ)
|
||||
|
||||
bg_fut = executor.submit(background_load)
|
||||
|
||||
# Do a split while the endpoint is alive
|
||||
env.storage_controller.tenant_shard_split(tenant_id, shard_count=4)
|
||||
|
||||
# Pump the scheduler to do all the changes it would do in the background
|
||||
# after a shard split.
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=300)
|
||||
|
||||
background_stop.set()
|
||||
bg_fut.result(timeout=background_job_duration * 2)
|
||||
@@ -1,109 +0,0 @@
|
||||
import concurrent.futures
|
||||
import random
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.types import TenantId, TenantShardId, TimelineId
|
||||
|
||||
|
||||
def test_sharding_service_many_tenants(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Check that we cope well with a not-totally-trivial number of tenants.
|
||||
|
||||
This is checking for:
|
||||
- Obvious concurrency bugs from issuing many tenant creations/modifications
|
||||
concurrently.
|
||||
- Obvious scaling bugs like O(N^2) scaling that would be so slow that even
|
||||
a basic test starts failing from slowness.
|
||||
|
||||
This is _not_ a comprehensive scale test: just a basic sanity check that
|
||||
we don't fall over for a thousand shards.
|
||||
"""
|
||||
|
||||
neon_env_builder.num_pageservers = 5
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Total tenants
|
||||
tenant_count = 2000
|
||||
|
||||
# Shards per tenant
|
||||
shard_count = 2
|
||||
stripe_size = 1024
|
||||
|
||||
tenants = set(TenantId.generate() for _i in range(0, tenant_count))
|
||||
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
|
||||
# We use a fixed seed to make the test reproducible: we want a randomly
|
||||
# chosen order, but not to change the order every time we run the test.
|
||||
rng = random.Random(1234)
|
||||
|
||||
# We will create tenants directly via API, not via neon_local, to avoid any false
|
||||
# serialization of operations in neon_local (it e.g. loads/saves a config file on each call)
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
futs = []
|
||||
for tenant_id in tenants:
|
||||
f = executor.submit(
|
||||
env.storage_controller.tenant_create, tenant_id, shard_count, stripe_size
|
||||
)
|
||||
futs.append(f)
|
||||
|
||||
# Wait for creations to finish
|
||||
for f in futs:
|
||||
f.result()
|
||||
|
||||
# Generate a mixture of operations and dispatch them all concurrently
|
||||
futs = []
|
||||
for tenant_id in tenants:
|
||||
op = rng.choice([0, 1, 2])
|
||||
if op == 0:
|
||||
# A fan-out write operation to all shards in a tenant (timeline creation)
|
||||
f = executor.submit(
|
||||
virtual_ps_http.timeline_create,
|
||||
PgVersion.NOT_SET,
|
||||
tenant_id,
|
||||
TimelineId.generate(),
|
||||
)
|
||||
elif op == 1:
|
||||
# A reconciler operation: migrate a shard.
|
||||
shard_number = rng.randint(0, shard_count - 1)
|
||||
tenant_shard_id = TenantShardId(tenant_id, shard_number, shard_count)
|
||||
dest_ps_id = rng.choice([ps.id for ps in env.pageservers])
|
||||
f = executor.submit(
|
||||
env.storage_controller.tenant_shard_migrate, tenant_shard_id, dest_ps_id
|
||||
)
|
||||
elif op == 2:
|
||||
# A passthrough read to shard zero
|
||||
f = executor.submit(virtual_ps_http.tenant_status, tenant_id)
|
||||
|
||||
futs.append(f)
|
||||
|
||||
# Wait for mixed ops to finish
|
||||
for f in futs:
|
||||
f.result()
|
||||
|
||||
# Rolling node failures: this is a small number of requests, but results in a large
|
||||
# number of scheduler calls and reconcile tasks.
|
||||
for pageserver in env.pageservers:
|
||||
env.storage_controller.node_configure(pageserver.id, {"availability": "Offline"})
|
||||
# The sleeps are just to make sure we aren't optimizing-away any re-scheduling operations
|
||||
# from a brief flap in node state.
|
||||
time.sleep(1)
|
||||
env.storage_controller.node_configure(pageserver.id, {"availability": "Active"})
|
||||
time.sleep(1)
|
||||
|
||||
# Restart the storage controller
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
# Restart pageservers: this exercises the /re-attach API
|
||||
for pageserver in env.pageservers:
|
||||
pageserver.stop()
|
||||
pageserver.start()
|
||||
@@ -189,6 +189,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
},
|
||||
"trace_read_requests": True,
|
||||
"walreceiver_connect_timeout": "13m",
|
||||
"image_layer_creation_check_threshold": 1,
|
||||
}
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
@@ -84,11 +84,11 @@ def test_branching_with_pgbench(
|
||||
threads = []
|
||||
|
||||
if ty == "cascade":
|
||||
env.neon_cli.create_branch("b{}".format(i + 1), "b{}".format(i), tenant_id=tenant)
|
||||
env.neon_cli.create_branch(f"b{i + 1}", f"b{i}", tenant_id=tenant)
|
||||
else:
|
||||
env.neon_cli.create_branch("b{}".format(i + 1), "b0", tenant_id=tenant)
|
||||
env.neon_cli.create_branch(f"b{i + 1}", "b0", tenant_id=tenant)
|
||||
|
||||
endpoints.append(env.endpoints.create_start("b{}".format(i + 1), tenant_id=tenant))
|
||||
endpoints.append(env.endpoints.create_start(f"b{i + 1}", tenant_id=tenant))
|
||||
|
||||
threads.append(
|
||||
threading.Thread(target=run_pgbench, args=(endpoints[-1].connstr(),), daemon=True)
|
||||
|
||||
@@ -74,8 +74,8 @@ def test_large_schema(neon_env_builder: NeonEnvBuilder):
|
||||
cur.execute("select * from pg_depend order by refclassid, refobjid, refobjsubid")
|
||||
|
||||
# Check layer file sizes
|
||||
timeline_path = "{}/tenants/{}/timelines/{}/".format(
|
||||
env.pageserver.workdir, env.initial_tenant, env.initial_timeline
|
||||
timeline_path = (
|
||||
f"{env.pageserver.workdir}/tenants/{env.initial_tenant}/timelines/{env.initial_timeline}/"
|
||||
)
|
||||
for filename in os.listdir(timeline_path):
|
||||
if filename.startswith("00000"):
|
||||
|
||||
@@ -57,9 +57,7 @@ def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
time.sleep(10)
|
||||
|
||||
# Check layer file sizes
|
||||
timeline_path = "{}/tenants/{}/timelines/{}/".format(
|
||||
env.pageserver.workdir, env.initial_tenant, timeline
|
||||
)
|
||||
timeline_path = f"{env.pageserver.workdir}/tenants/{env.initial_tenant}/timelines/{timeline}/"
|
||||
log.info(f"Check {timeline_path}")
|
||||
for filename in os.listdir(timeline_path):
|
||||
if filename.startswith("00000"):
|
||||
|
||||
@@ -165,6 +165,7 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
|
||||
"compaction_threshold": "3",
|
||||
# "image_creation_threshold": set at runtime
|
||||
"compaction_target_size": f"{128 * (1024**2)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
|
||||
"image_layer_creation_check_threshold": "0", # always check if a new image layer can be created
|
||||
}
|
||||
|
||||
def tenant_update_config(changes):
|
||||
|
||||
@@ -53,6 +53,7 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
|
||||
"checkpoint_timeout": "24h", # something we won't reach
|
||||
"checkpoint_distance": f"{50 * (1024**2)}", # something we won't reach, we checkpoint manually
|
||||
"image_creation_threshold": "100", # we want to control when image is created
|
||||
"image_layer_creation_check_threshold": "0",
|
||||
"compaction_threshold": f"{l0_l1_threshold}",
|
||||
"compaction_target_size": f"{128 * (1024**3)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
|
||||
}
|
||||
|
||||
@@ -568,6 +568,8 @@ def test_compaction_downloads_on_demand_with_image_creation(neon_env_builder: Ne
|
||||
"image_creation_threshold": 100,
|
||||
# repartitioning parameter, unused
|
||||
"compaction_target_size": 128 * 1024**2,
|
||||
# Always check if a new image layer can be created
|
||||
"image_layer_creation_check_threshold": 0,
|
||||
# pitr_interval and gc_horizon are not interesting because we dont run gc
|
||||
}
|
||||
|
||||
@@ -632,7 +634,8 @@ def test_compaction_downloads_on_demand_with_image_creation(neon_env_builder: Ne
|
||||
# threshold to expose image creation to downloading all of the needed
|
||||
# layers -- threshold of 2 would sound more reasonable, but keeping it as 1
|
||||
# to be less flaky
|
||||
env.neon_cli.config_tenant(tenant_id, {"image_creation_threshold": "1"})
|
||||
conf["image_creation_threshold"] = "1"
|
||||
env.neon_cli.config_tenant(tenant_id, {k: str(v) for k, v in conf.items()})
|
||||
|
||||
pageserver_http.timeline_compact(tenant_id, timeline_id)
|
||||
layers = pageserver_http.layer_map_info(tenant_id, timeline_id)
|
||||
|
||||
@@ -9,7 +9,6 @@ of the pageserver are:
|
||||
- Updates to remote_consistent_lsn may only be made visible after validating generation
|
||||
"""
|
||||
|
||||
|
||||
import enum
|
||||
import re
|
||||
import time
|
||||
@@ -53,6 +52,7 @@ TENANT_CONF = {
|
||||
"compaction_period": "0s",
|
||||
# create image layers eagerly, so that GC can remove some layers
|
||||
"image_creation_threshold": "1",
|
||||
"image_layer_creation_check_threshold": "0",
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
with closing(endpoint.connect()) as con:
|
||||
with con.cursor() as c:
|
||||
for e in extensions:
|
||||
c.execute("create extension if not exists {};".format(e))
|
||||
c.execute(f"create extension if not exists {e};")
|
||||
|
||||
c.execute("create table foo (c int) with (autovacuum_enabled = false)")
|
||||
c.execute("insert into foo values (1)")
|
||||
@@ -42,14 +42,12 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
log.info("Test table is populated, validating buffer cache")
|
||||
|
||||
cache_entries = query_scalar(
|
||||
c, "select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode)
|
||||
c, f"select count(*) from pg_buffercache where relfilenode = {relfilenode}"
|
||||
)
|
||||
assert cache_entries > 0, "No buffers cached for the test relation"
|
||||
|
||||
c.execute(
|
||||
"select reltablespace, reldatabase, relfilenode from pg_buffercache where relfilenode = {}".format(
|
||||
relfilenode
|
||||
)
|
||||
f"select reltablespace, reldatabase, relfilenode from pg_buffercache where relfilenode = {relfilenode}"
|
||||
)
|
||||
reln = c.fetchone()
|
||||
assert reln is not None
|
||||
@@ -59,22 +57,20 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
c.execute("select clear_buffer_cache()")
|
||||
|
||||
cache_entries = query_scalar(
|
||||
c, "select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode)
|
||||
c, f"select count(*) from pg_buffercache where relfilenode = {relfilenode}"
|
||||
)
|
||||
assert cache_entries == 0, "Failed to clear buffer cache"
|
||||
|
||||
log.info("Cache is clear, reading stale page version")
|
||||
|
||||
c.execute(
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '{}'))".format(
|
||||
first[0]
|
||||
)
|
||||
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '{first[0]}'))"
|
||||
)
|
||||
direct_first = c.fetchone()
|
||||
assert first == direct_first, "Failed fetch page at historic lsn"
|
||||
|
||||
cache_entries = query_scalar(
|
||||
c, "select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode)
|
||||
c, f"select count(*) from pg_buffercache where relfilenode = {relfilenode}"
|
||||
)
|
||||
assert cache_entries == 0, "relation buffers detected after invalidation"
|
||||
|
||||
@@ -87,7 +83,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
assert second == direct_latest, "Failed fetch page at latest lsn"
|
||||
|
||||
cache_entries = query_scalar(
|
||||
c, "select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode)
|
||||
c, f"select count(*) from pg_buffercache where relfilenode = {relfilenode}"
|
||||
)
|
||||
assert cache_entries == 0, "relation buffers detected after invalidation"
|
||||
|
||||
@@ -96,9 +92,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
)
|
||||
|
||||
c.execute(
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn( {}, {}, {}, 0, 0, '{}' ))".format(
|
||||
reln[0], reln[1], reln[2], first[0]
|
||||
)
|
||||
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, '{first[0]}'))"
|
||||
)
|
||||
direct_first = c.fetchone()
|
||||
assert first == direct_first, "Failed fetch page at historic lsn using oid"
|
||||
@@ -108,9 +102,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
)
|
||||
|
||||
c.execute(
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn( {}, {}, {}, 0, 0, NULL ))".format(
|
||||
reln[0], reln[1], reln[2]
|
||||
)
|
||||
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, NULL))"
|
||||
)
|
||||
direct_latest = c.fetchone()
|
||||
assert second == direct_latest, "Failed fetch page at latest lsn"
|
||||
@@ -122,9 +114,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
)
|
||||
|
||||
c.execute(
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn( {}, {}, {}, 0, 0, '{}' ))".format(
|
||||
reln[0], reln[1], reln[2], first[0]
|
||||
)
|
||||
f"select lsn, lower, upper from page_header(get_raw_page_at_lsn({reln[0]}, {reln[1]}, {reln[2]}, 0, 0, '{first[0]}'))"
|
||||
)
|
||||
direct_first = c.fetchone()
|
||||
assert first == direct_first, "Failed fetch page at historic lsn using oid"
|
||||
@@ -134,7 +124,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
c.execute("select * from page_header(get_raw_page('foo', 'main', 0));")
|
||||
raise AssertionError("query should have failed")
|
||||
except UndefinedTable as e:
|
||||
log.info("Caught an expected failure: {}".format(e))
|
||||
log.info(f"Caught an expected failure: {e}")
|
||||
|
||||
|
||||
def test_read_validation_neg(neon_simple_env: NeonEnv):
|
||||
@@ -148,7 +138,7 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
|
||||
with closing(endpoint.connect()) as con:
|
||||
with con.cursor() as c:
|
||||
for e in extensions:
|
||||
c.execute("create extension if not exists {};".format(e))
|
||||
c.execute(f"create extension if not exists {e};")
|
||||
|
||||
log.info("read a page of a missing relation")
|
||||
try:
|
||||
@@ -157,7 +147,7 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
|
||||
)
|
||||
raise AssertionError("query should have failed")
|
||||
except UndefinedTable as e:
|
||||
log.info("Caught an expected failure: {}".format(e))
|
||||
log.info(f"Caught an expected failure: {e}")
|
||||
|
||||
c.execute("create table foo (c int) with (autovacuum_enabled = false)")
|
||||
c.execute("insert into foo values (1)")
|
||||
@@ -169,7 +159,7 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
|
||||
)
|
||||
raise AssertionError("query should have failed")
|
||||
except IoError as e:
|
||||
log.info("Caught an expected failure: {}".format(e))
|
||||
log.info(f"Caught an expected failure: {e}")
|
||||
|
||||
log.info("Pass NULL as an input")
|
||||
expected = (None, None, None)
|
||||
|
||||
@@ -245,6 +245,7 @@ def test_remote_storage_upload_queue_retries(
|
||||
"compaction_period": "0s",
|
||||
# create image layers eagerly, so that GC can remove some layers
|
||||
"image_creation_threshold": "1",
|
||||
"image_layer_creation_check_threshold": "0",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from collections import defaultdict
|
||||
@@ -254,6 +253,10 @@ def test_sharding_split_smoke(
|
||||
# The old parent shards should no longer exist on disk
|
||||
assert not shards_on_disk(old_shard_ids)
|
||||
|
||||
# Enough background reconciliations should result in the shards being properly distributed.
|
||||
# Run this before the workload, because its LSN-waiting code presumes stable locations.
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
workload.validate()
|
||||
|
||||
workload.churn_rows(256)
|
||||
@@ -267,14 +270,6 @@ def test_sharding_split_smoke(
|
||||
pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None)
|
||||
workload.validate()
|
||||
|
||||
# Enough background reconciliations should result in the shards being properly distributed
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
# We have 8 shards and 16 nodes
|
||||
# Initially I expect 4 nodes to have 2 attached locations each, and another 8 nodes to have
|
||||
# 1 secondary location each
|
||||
# 2 2 2 2 1 1 1 1 1 1 1 1 0 0 0 0
|
||||
|
||||
# Assert on how many reconciles happened during the process. This is something of an
|
||||
# implementation detail, but it is useful to detect any bugs that might generate spurious
|
||||
# extra reconcile iterations.
|
||||
@@ -333,6 +328,31 @@ def test_sharding_split_smoke(
|
||||
assert sum(total.values()) == split_shard_count * 2
|
||||
check_effective_tenant_config()
|
||||
|
||||
# More specific check: that we are fully balanced. This is deterministic because
|
||||
# the order in which we consider shards for optimization is deterministic, and the
|
||||
# order of preference of nodes is also deterministic (lower node IDs win).
|
||||
log.info(f"total: {total}")
|
||||
assert total == {
|
||||
1: 1,
|
||||
2: 1,
|
||||
3: 1,
|
||||
4: 1,
|
||||
5: 1,
|
||||
6: 1,
|
||||
7: 1,
|
||||
8: 1,
|
||||
9: 1,
|
||||
10: 1,
|
||||
11: 1,
|
||||
12: 1,
|
||||
13: 1,
|
||||
14: 1,
|
||||
15: 1,
|
||||
16: 1,
|
||||
}
|
||||
log.info(f"attached: {attached}")
|
||||
assert attached == {1: 1, 2: 1, 3: 1, 5: 1, 6: 1, 7: 1, 9: 1, 11: 1}
|
||||
|
||||
# Ensure post-split pageserver locations survive a restart (i.e. the child shards
|
||||
# correctly wrote config to disk, and the storage controller responds correctly
|
||||
# to /re-attach)
|
||||
@@ -391,6 +411,7 @@ def test_sharding_split_stripe_size(
|
||||
env.storage_controller.tenant_shard_split(
|
||||
tenant_id, shard_count=2, shard_stripe_size=new_stripe_size
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
# Check that we ended up with the stripe size that we expected, both on the pageserver
|
||||
# and in the notifications to compute
|
||||
@@ -859,6 +880,7 @@ def test_sharding_split_failures(
|
||||
# Having failed+rolled back, we should be able to split again
|
||||
# No failures this time; it will succeed
|
||||
env.storage_controller.tenant_shard_split(tenant_id, shard_count=split_shard_count)
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=30)
|
||||
|
||||
workload.churn_rows(10)
|
||||
workload.validate()
|
||||
@@ -912,6 +934,10 @@ def test_sharding_split_failures(
|
||||
finish_split()
|
||||
assert_split_done()
|
||||
|
||||
# Having completed the split, pump the background reconciles to ensure that
|
||||
# the scheduler reaches an idle state
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=30)
|
||||
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
@@ -1040,82 +1066,3 @@ def test_sharding_backpressure(neon_env_builder: NeonEnvBuilder):
|
||||
max_lsn = max(Lsn(info["last_record_lsn"]) for info in infos)
|
||||
diff = max_lsn - min_lsn
|
||||
assert diff < 2 * 1024 * 1024, f"LSN diff={diff}, expected diff < 2MB due to backpressure"
|
||||
# Stripe sizes in number of pages.
|
||||
TINY_STRIPES = 16
|
||||
LARGE_STRIPES = 32768
|
||||
|
||||
|
||||
@pytest.mark.parametrize("stripe_size", [TINY_STRIPES, LARGE_STRIPES])
|
||||
def test_sharding_compaction(neon_env_builder: NeonEnvBuilder, stripe_size: int):
|
||||
"""
|
||||
Use small stripes, small layers, and small compaction thresholds to exercise how compaction
|
||||
and image layer generation interacts with sharding.
|
||||
"""
|
||||
|
||||
TENANT_CONF = {
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": f"{128 * 1024}",
|
||||
"compaction_threshold": "1",
|
||||
"compaction_target_size": f"{128 * 1024}",
|
||||
# no PITR horizon, we specify the horizon when we request on-demand GC
|
||||
"pitr_interval": "0s",
|
||||
# disable background compaction and GC. We invoke it manually when we want it to happen.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# create image layers eagerly: we want to exercise image layer creation in this test.
|
||||
"image_creation_threshold": "1",
|
||||
"image_layer_creation_check_threshold": 0,
|
||||
}
|
||||
|
||||
neon_env_builder.num_pageservers = 4
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf=TENANT_CONF,
|
||||
initial_tenant_shard_count=4,
|
||||
initial_tenant_shard_stripe_size=stripe_size,
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(64)
|
||||
for _i in range(0, 10):
|
||||
# Each of these does some writes then a checkpoint: because we set image_creation_threshold to 1,
|
||||
# these should result in image layers each time we write some data into a shard, and also shards
|
||||
# recieving less data hitting their "empty image layer" path (wherre they should skip writing the layer,
|
||||
# rather than asserting)
|
||||
workload.churn_rows(64)
|
||||
|
||||
# Assert that we got some image layers: this is important because this test's purpose is to exercise the sharding changes
|
||||
# to Timeline::create_image_layers, so if we weren't creating any image layers we wouldn't be doing our job.
|
||||
shard_has_image_layers = []
|
||||
for shard in env.storage_controller.locate(tenant_id):
|
||||
pageserver = env.get_pageserver(shard["node_id"])
|
||||
shard_id = shard["shard_id"]
|
||||
layer_map = pageserver.http_client().layer_map_info(shard_id, timeline_id)
|
||||
image_layer_sizes = {}
|
||||
for layer in layer_map.historic_layers:
|
||||
if layer.kind == "Image":
|
||||
image_layer_sizes[layer.layer_file_name] = layer.layer_file_size
|
||||
|
||||
# Pageserver should assert rather than emit an empty layer file, but double check here
|
||||
assert layer.layer_file_size is not None
|
||||
assert layer.layer_file_size > 0
|
||||
|
||||
shard_has_image_layers.append(len(image_layer_sizes) > 1)
|
||||
log.info(f"Shard {shard_id} layer sizes: {json.dumps(image_layer_sizes, indent=2)}")
|
||||
|
||||
# TODO: once keyspace partitioning is updated, assert that layer sizes are as expected
|
||||
# (see https://github.com/neondatabase/neon/issues/6774)
|
||||
|
||||
if stripe_size == TINY_STRIPES:
|
||||
# Expect writes were scattered across all pageservers: they should all have compacted some image layers
|
||||
assert all(shard_has_image_layers)
|
||||
else:
|
||||
# With large stripes, it is expected that most of our writes went to one pageserver, so we just require
|
||||
# that at least one of them has some image layers.
|
||||
assert any(shard_has_image_layers)
|
||||
|
||||
# Assert that everything is still readable
|
||||
workload.validate()
|
||||
|
||||
@@ -433,10 +433,13 @@ def test_sharding_service_compute_hook(
|
||||
# Set up fake HTTP notify endpoint
|
||||
notifications = []
|
||||
|
||||
handle_params = {"status": 200}
|
||||
|
||||
def handler(request: Request):
|
||||
log.info(f"Notify request: {request}")
|
||||
status = handle_params["status"]
|
||||
log.info(f"Notify request[{status}]: {request}")
|
||||
notifications.append(request.json)
|
||||
return Response(status=200)
|
||||
return Response(status=status)
|
||||
|
||||
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
|
||||
|
||||
@@ -504,6 +507,24 @@ def test_sharding_service_compute_hook(
|
||||
|
||||
wait_until(10, 1, received_split_notification)
|
||||
|
||||
# If the compute hook is unavailable, that should not block creating a tenant and
|
||||
# creating a timeline. This simulates a control plane refusing to accept notifications
|
||||
handle_params["status"] = 423
|
||||
degraded_tenant_id = TenantId.generate()
|
||||
degraded_timeline_id = TimelineId.generate()
|
||||
env.storage_controller.tenant_create(degraded_tenant_id)
|
||||
env.storage_controller.pageserver_api().timeline_create(
|
||||
PgVersion.NOT_SET, degraded_tenant_id, degraded_timeline_id
|
||||
)
|
||||
|
||||
# Ensure we hit the handler error path
|
||||
env.storage_controller.allowed_errors.append(
|
||||
".*Failed to notify compute of attached pageserver.*tenant busy.*"
|
||||
)
|
||||
env.storage_controller.allowed_errors.append(".*Reconcile error.*tenant busy.*")
|
||||
assert notifications[-1] is not None
|
||||
assert notifications[-1]["tenant_id"] == str(degraded_tenant_id)
|
||||
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
|
||||
@@ -1,188 +0,0 @@
|
||||
import concurrent.futures
|
||||
import random
|
||||
from collections import defaultdict
|
||||
|
||||
from fixtures.compute_reconfigure import ComputeReconfigure
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
|
||||
def get_node_shard_counts(env: NeonEnv, tenant_ids):
|
||||
total: defaultdict[int, int] = defaultdict(int)
|
||||
attached: defaultdict[int, int] = defaultdict(int)
|
||||
for tid in tenant_ids:
|
||||
for shard in env.storage_controller.tenant_describe(tid)["shards"]:
|
||||
log.info(
|
||||
f"{shard['tenant_shard_id']}: attached={shard['node_attached']}, secondary={shard['node_secondary']} "
|
||||
)
|
||||
for node in shard["node_secondary"]:
|
||||
total[int(node)] += 1
|
||||
attached[int(shard["node_attached"])] += 1
|
||||
total[int(shard["node_attached"])] += 1
|
||||
|
||||
return total, attached
|
||||
|
||||
|
||||
def test_storcon_rolling_failures(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
compute_reconfigure_listener: ComputeReconfigure,
|
||||
):
|
||||
neon_env_builder.num_pageservers = 8
|
||||
|
||||
neon_env_builder.control_plane_compute_hook_api = (
|
||||
compute_reconfigure_listener.control_plane_compute_hook_api
|
||||
)
|
||||
|
||||
workloads: dict[TenantId, Workload] = {}
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
for ps in env.pageservers:
|
||||
# We will do unclean detaches
|
||||
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
|
||||
|
||||
n_tenants = 32
|
||||
tenants = [(env.initial_tenant, env.initial_timeline)]
|
||||
for i in range(0, n_tenants - 1):
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
shard_count = [1, 2, 4][i % 3]
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id, timeline_id, shard_count=shard_count, placement_policy='{"Double":1}'
|
||||
)
|
||||
tenants.append((tenant_id, timeline_id))
|
||||
|
||||
# Background pain:
|
||||
# - TODO: some fraction of pageserver API requests hang
|
||||
# (this requires implementing wrap of location_conf calls with proper timeline/cancel)
|
||||
# - TODO: continuous tenant/timeline creation/destruction over a different ID range than
|
||||
# the ones we're using for availability checks.
|
||||
|
||||
rng = random.Random(0xDEADBEEF)
|
||||
|
||||
for tenant_id, timeline_id in tenants:
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
compute_reconfigure_listener.register_workload(workload)
|
||||
workloads[tenant_id] = workload
|
||||
|
||||
def node_evacuated(node_id: int):
|
||||
total, attached = get_node_shard_counts(env, [t[0] for t in tenants])
|
||||
assert attached[node_id] == 0
|
||||
|
||||
def attachments_active():
|
||||
for tid, _tlid in tenants:
|
||||
for shard in env.storage_controller.locate(tid):
|
||||
psid = shard["node_id"]
|
||||
tsid = TenantShardId.parse(shard["shard_id"])
|
||||
status = env.get_pageserver(psid).http_client().tenant_status(tenant_id=tsid)
|
||||
assert status["state"]["slug"] == "Active"
|
||||
log.info(f"Shard {tsid} active on node {psid}")
|
||||
|
||||
failpoints = ("api-503", "5%1000*return(1)")
|
||||
failpoints_str = f"{failpoints[0]}={failpoints[1]}"
|
||||
for ps in env.pageservers:
|
||||
ps.http_client().configure_failpoints(failpoints)
|
||||
|
||||
def for_all_workloads(callback, timeout=60):
|
||||
futs = []
|
||||
with concurrent.futures.ThreadPoolExecutor() as pool:
|
||||
for _tenant_id, workload in workloads.items():
|
||||
futs.append(pool.submit(callback, workload))
|
||||
|
||||
for f in futs:
|
||||
f.result(timeout=timeout)
|
||||
|
||||
def clean_fail_restore():
|
||||
"""
|
||||
Clean shutdown of a node: mark it offline in storage controller, wait for new attachment
|
||||
locations to activate, then SIGTERM it.
|
||||
- Endpoints should not fail any queries
|
||||
- New attach locations should activate within bounded time.
|
||||
"""
|
||||
victim = rng.choice(env.pageservers)
|
||||
env.storage_controller.node_configure(victim.id, {"availability": "Offline"})
|
||||
|
||||
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
|
||||
wait_until(10, 1, attachments_active)
|
||||
|
||||
victim.stop(immediate=False)
|
||||
|
||||
traffic()
|
||||
|
||||
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
|
||||
|
||||
# Revert shards to attach at their original locations
|
||||
# TODO
|
||||
# env.storage_controller.balance_attached()
|
||||
wait_until(10, 1, attachments_active)
|
||||
|
||||
def hard_fail_restore():
|
||||
"""
|
||||
Simulate an unexpected death of a pageserver node
|
||||
"""
|
||||
victim = rng.choice(env.pageservers)
|
||||
victim.stop(immediate=True)
|
||||
# TODO: once we implement heartbeats detecting node failures, remove this
|
||||
# explicit marking offline and rely on storage controller to detect it itself.
|
||||
env.storage_controller.node_configure(victim.id, {"availability": "Offline"})
|
||||
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
|
||||
wait_until(10, 1, attachments_active)
|
||||
traffic()
|
||||
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
|
||||
# TODO
|
||||
# env.storage_controller.balance_attached()
|
||||
wait_until(10, 1, attachments_active)
|
||||
|
||||
def traffic():
|
||||
"""
|
||||
Check that all tenants are working for postgres clients
|
||||
"""
|
||||
|
||||
def exercise_one(workload):
|
||||
workload.churn_rows(100)
|
||||
workload.validate()
|
||||
|
||||
for_all_workloads(exercise_one)
|
||||
|
||||
def init_one(workload):
|
||||
workload.init()
|
||||
workload.write_rows(100)
|
||||
|
||||
for_all_workloads(init_one, timeout=60)
|
||||
|
||||
for i in range(0, 20):
|
||||
mode = rng.choice([0, 1, 2])
|
||||
log.info(f"Iteration {i}, mode {mode}")
|
||||
if mode == 0:
|
||||
# Traffic interval: sometimes, instead of a failure, just let the clients
|
||||
# write a load of data. This avoids chaos tests ending up with unrealistically
|
||||
# small quantities of data in flight.
|
||||
traffic()
|
||||
elif mode == 1:
|
||||
clean_fail_restore()
|
||||
elif mode == 2:
|
||||
hard_fail_restore()
|
||||
|
||||
# Fail and restart: hard-kill one node. Notify the storage controller that it is offline.
|
||||
# Success criteria:
|
||||
# - New attach locations should activate within bounded time
|
||||
# - TODO: once we do heartbeating, we should not have to explicitly mark the node offline
|
||||
|
||||
# TODO: fail and remove: fail a node, and remove it from the cluster.
|
||||
# Success criteria:
|
||||
# - Endpoints should not fail any queries
|
||||
# - New attach locations should activate within bounded time
|
||||
# - New secondary locations should fill up with data within bounded time
|
||||
|
||||
# TODO: somehow need to wait for reconciles to complete before doing consistency check
|
||||
# (or make the check wait).
|
||||
|
||||
# Do consistency check on every iteration, not just at the end: this makes it more obvious
|
||||
# which change caused an issue.
|
||||
env.storage_controller.consistency_check()
|
||||
@@ -103,9 +103,7 @@ def test_many_timelines(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
n_timelines = 3
|
||||
|
||||
branch_names = [
|
||||
"test_safekeepers_many_timelines_{}".format(tlin) for tlin in range(n_timelines)
|
||||
]
|
||||
branch_names = [f"test_safekeepers_many_timelines_{tlin}" for tlin in range(n_timelines)]
|
||||
# pageserver, safekeeper operate timelines via their ids (can be represented in hex as 'ad50847381e248feaac9876cc71ae418')
|
||||
# that's not really human readable, so the branch names are introduced in Neon CLI.
|
||||
# Neon CLI stores its branch <-> timeline mapping in its internals,
|
||||
@@ -1136,13 +1134,13 @@ def cmp_sk_wal(sks: List[Safekeeper], tenant_id: TenantId, timeline_id: Timeline
|
||||
for f in mismatch:
|
||||
f1 = os.path.join(sk0.timeline_dir(tenant_id, timeline_id), f)
|
||||
f2 = os.path.join(sk.timeline_dir(tenant_id, timeline_id), f)
|
||||
stdout_filename = "{}.filediff".format(f2)
|
||||
stdout_filename = f"{f2}.filediff"
|
||||
|
||||
with open(stdout_filename, "w") as stdout_f:
|
||||
subprocess.run("xxd {} > {}.hex ".format(f1, f1), shell=True)
|
||||
subprocess.run("xxd {} > {}.hex ".format(f2, f2), shell=True)
|
||||
subprocess.run(f"xxd {f1} > {f1}.hex ", shell=True)
|
||||
subprocess.run(f"xxd {f2} > {f2}.hex ", shell=True)
|
||||
|
||||
cmd = "diff {}.hex {}.hex".format(f1, f2)
|
||||
cmd = f"diff {f1}.hex {f2}.hex"
|
||||
subprocess.run([cmd], stdout=stdout_f, shell=True)
|
||||
|
||||
assert (mismatch, not_regular) == (
|
||||
|
||||
@@ -76,20 +76,20 @@ class WorkerStats(object):
|
||||
self.counters[worker_id] += 1
|
||||
|
||||
def check_progress(self):
|
||||
log.debug("Workers progress: {}".format(self.counters))
|
||||
log.debug(f"Workers progress: {self.counters}")
|
||||
|
||||
# every worker should finish at least one tx
|
||||
assert all(cnt > 0 for cnt in self.counters)
|
||||
|
||||
progress = sum(self.counters)
|
||||
log.info("All workers made {} transactions".format(progress))
|
||||
log.info(f"All workers made {progress} transactions")
|
||||
|
||||
|
||||
async def run_random_worker(
|
||||
stats: WorkerStats, endpoint: Endpoint, worker_id, n_accounts, max_transfer
|
||||
):
|
||||
pg_conn = await endpoint.connect_async()
|
||||
log.debug("Started worker {}".format(worker_id))
|
||||
log.debug(f"Started worker {worker_id}")
|
||||
|
||||
while stats.running:
|
||||
from_uid = random.randint(0, n_accounts - 1)
|
||||
@@ -99,9 +99,9 @@ async def run_random_worker(
|
||||
await bank_transfer(pg_conn, from_uid, to_uid, amount)
|
||||
stats.inc_progress(worker_id)
|
||||
|
||||
log.debug("Executed transfer({}) {} => {}".format(amount, from_uid, to_uid))
|
||||
log.debug(f"Executed transfer({amount}) {from_uid} => {to_uid}")
|
||||
|
||||
log.debug("Finished worker {}".format(worker_id))
|
||||
log.debug(f"Finished worker {worker_id}")
|
||||
|
||||
await pg_conn.close()
|
||||
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 748643b468...a7b4c66156
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: e7651e79c0...64b8c7bccc
4
vendor/revisions.json
vendored
4
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"postgres-v16": "3946b2e2ea71d07af092099cb5bcae76a69b90d6",
|
||||
"postgres-v15": "e7651e79c0c27fbddc3c724f5b9553222c28e395",
|
||||
"postgres-v14": "748643b4683e9fe3b105011a6ba8a687d032cd65"
|
||||
"postgres-v15": "64b8c7bccc6b77e04795e2d4cf6ad82dc8d987ed",
|
||||
"postgres-v14": "a7b4c66156bce00afa60e5592d4284ba9e40b4cf"
|
||||
}
|
||||
|
||||
@@ -187,6 +187,14 @@ files:
|
||||
query: |
|
||||
select sum(pg_database_size(datname)) as total from pg_database;
|
||||
|
||||
- metric_name: lfc_approximate_working_set_size
|
||||
type: gauge
|
||||
help: 'Approximate working set size in pages of 8192 bytes'
|
||||
key_labels:
|
||||
values: [approximate_working_set_size]
|
||||
query: |
|
||||
select neon.approximate_working_set_size(false) as approximate_working_set_size;
|
||||
|
||||
build: |
|
||||
# Build cgroup-tools
|
||||
#
|
||||
|
||||
Reference in New Issue
Block a user