Compare commits

..

4 Commits

Author SHA1 Message Date
Konstantin Knizhnik
2831a17bfa Limit number of AUX files deltas to reduce reconstruct time 2024-02-22 08:57:09 +02:00
Konstantin Knizhnik
55574da76d Bump Postgres version 2024-02-21 21:32:14 +02:00
Konstantin Knizhnik
997093b7cd Bu,p postgres versions 2024-02-21 21:30:46 +02:00
Konstantin Knizhnik
98f51df0b1 Flush logical messages with snapshots and replication origin 2024-02-21 21:30:46 +02:00
39 changed files with 271 additions and 847 deletions

View File

@@ -16,14 +16,8 @@ concurrency:
cancel-in-progress: ${{ github.event_name == 'pull_request' }}
jobs:
check-permissions:
if: ${{ !contains(github.event.pull_request.labels.*.name, 'run-no-ci') }}
uses: ./.github/workflows/check-permissions.yml
with:
github-event-name: ${{ github.event_name}}
actionlint:
needs: [ check-permissions ]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'run-no-ci') }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

View File

@@ -27,9 +27,24 @@ env:
jobs:
check-permissions:
if: ${{ !contains(github.event.pull_request.labels.*.name, 'run-no-ci') }}
uses: ./.github/workflows/check-permissions.yml
with:
github-event-name: ${{ github.event_name}}
runs-on: ubuntu-latest
steps:
- name: Disallow PRs from forks
if: |
github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository
run: |
if [ "${{ contains(fromJSON('["OWNER", "MEMBER", "COLLABORATOR"]'), github.event.pull_request.author_association) }}" = "true" ]; then
MESSAGE="Please create a PR from a branch of ${GITHUB_REPOSITORY} instead of a fork"
else
MESSAGE="The PR should be reviewed and labelled with 'approved-for-ci-run' to trigger a CI run"
fi
echo >&2 "We don't run CI for PRs from forks"
echo >&2 "${MESSAGE}"
exit 1
cancel-previous-e2e-tests:
needs: [ check-permissions ]

View File

@@ -1,36 +0,0 @@
name: Check Permissions
on:
workflow_call:
inputs:
github-event-name:
required: true
type: string
defaults:
run:
shell: bash -euo pipefail {0}
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
permissions: {}
jobs:
check-permissions:
runs-on: ubuntu-latest
steps:
- name: Disallow CI runs on PRs from forks
if: |
inputs.github-event-name == 'pull_request' &&
github.event.pull_request.head.repo.full_name != github.repository
run: |
if [ "${{ contains(fromJSON('["OWNER", "MEMBER", "COLLABORATOR"]'), github.event.pull_request.author_association) }}" = "true" ]; then
MESSAGE="Please create a PR from a branch of ${GITHUB_REPOSITORY} instead of a fork"
else
MESSAGE="The PR should be reviewed and labelled with 'approved-for-ci-run' to trigger a CI run"
fi
# TODO: use actions/github-script to post this message as a PR comment
echo >&2 "We don't run CI for PRs from forks"
echo >&2 "${MESSAGE}"
exit 1

View File

@@ -20,14 +20,7 @@ env:
COPT: '-Werror'
jobs:
check-permissions:
if: ${{ !contains(github.event.pull_request.labels.*.name, 'run-no-ci') }}
uses: ./.github/workflows/check-permissions.yml
with:
github-event-name: ${{ github.event_name}}
check-macos-build:
needs: [ check-permissions ]
if: |
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
@@ -123,8 +116,8 @@ jobs:
run: ./run_clippy.sh
check-linux-arm-build:
needs: [ check-permissions ]
timeout-minutes: 90
if: ${{ !contains(github.event.pull_request.labels.*.name, 'run-no-ci') }}
runs-on: [ self-hosted, dev, arm64 ]
env:
@@ -244,8 +237,8 @@ jobs:
cargo nextest run --package remote_storage --test test_real_azure
check-codestyle-rust-arm:
needs: [ check-permissions ]
timeout-minutes: 90
if: ${{ !contains(github.event.pull_request.labels.*.name, 'run-no-ci') }}
runs-on: [ self-hosted, dev, arm64 ]
container:
@@ -316,7 +309,6 @@ jobs:
run: cargo deny check
gather-rust-build-stats:
needs: [ check-permissions ]
if: |
contains(github.event.pull_request.labels.*.name, 'run-extra-build-stats') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||

1
Cargo.lock generated
View File

@@ -3552,7 +3552,6 @@ dependencies = [
"const_format",
"enum-map",
"hex",
"humantime",
"humantime-serde",
"itertools",
"postgres_ffi",

View File

@@ -5,7 +5,7 @@
Neon is a serverless open-source alternative to AWS Aurora Postgres. It separates storage and compute and substitutes the PostgreSQL storage layer by redistributing data across a cluster of nodes.
## Quick start
Try the [Neon Free Tier](https://neon.tech) to create a serverless Postgres instance. Then connect to it with your preferred Postgres client (psql, dbeaver, etc) or use the online [SQL Editor](https://neon.tech/docs/get-started-with-neon/query-with-neon-sql-editor/). See [Connect from any application](https://neon.tech/docs/connect/connect-from-any-app/) for connection instructions.
Try the [Neon Free Tier](https://neon.tech/docs/introduction/technical-preview-free-tier/) to create a serverless Postgres instance. Then connect to it with your preferred Postgres client (psql, dbeaver, etc) or use the online [SQL Editor](https://neon.tech/docs/get-started-with-neon/query-with-neon-sql-editor/). See [Connect from any application](https://neon.tech/docs/connect/connect-from-any-app/) for connection instructions.
Alternatively, compile and run the project [locally](#running-local-installation).

View File

@@ -777,9 +777,6 @@ BEGIN
END
$$;"#,
"GRANT pg_monitor TO neon_superuser WITH ADMIN OPTION",
// ensure tables created by superusers (i.e., when creating extensions) can be used by neon_superuser.
"ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser",
"ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO neon_superuser",
];
let mut query = "CREATE SCHEMA IF NOT EXISTS neon_migration";

View File

@@ -66,7 +66,14 @@ fn get_state(request: &Request<Body>) -> &HttpState {
async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
let state = get_state(&req);
json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?)
json_response(
StatusCode::OK,
state
.service
.re_attach(reattach_req)
.await
.map_err(ApiError::InternalServerError)?,
)
}
/// Pageserver calls into this before doing deletions, to confirm that it still
@@ -325,10 +332,7 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
}
let state = get_state(&req);
json_response(
StatusCode::OK,
state.service.node_configure(config_req).await?,
)
json_response(StatusCode::OK, state.service.node_configure(config_req)?)
}
async fn handle_tenant_shard_split(

View File

@@ -10,7 +10,7 @@ use crate::persistence::NodePersistence;
///
/// The persistent subset of the Node is defined in [`crate::persistence::NodePersistence`]: the
/// implementation of serialization on this type is only for debug dumps.
#[derive(Clone, Serialize)]
#[derive(Clone, Serialize, Eq, PartialEq)]
pub(crate) struct Node {
pub(crate) id: NodeId,

View File

@@ -6,7 +6,7 @@ use std::time::Duration;
use self::split_state::SplitState;
use camino::Utf8Path;
use camino::Utf8PathBuf;
use control_plane::attachment_service::NodeSchedulingPolicy;
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::Connection;
@@ -130,10 +130,24 @@ impl Persistence {
}
/// At startup, populate the list of nodes which our shards may be placed on
pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
let nodes: Vec<NodePersistence> = self
pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<Node>> {
let nodes: Vec<Node> = self
.with_conn(move |conn| -> DatabaseResult<_> {
Ok(crate::schema::nodes::table.load::<NodePersistence>(conn)?)
Ok(crate::schema::nodes::table
.load::<NodePersistence>(conn)?
.into_iter()
.map(|n| Node {
id: NodeId(n.node_id as u64),
// At startup we consider a node offline until proven otherwise.
availability: NodeAvailability::Offline,
scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
.expect("Bad scheduling policy in DB"),
listen_http_addr: n.listen_http_addr,
listen_http_port: n.listen_http_port as u16,
listen_pg_addr: n.listen_pg_addr,
listen_pg_port: n.listen_pg_port as u16,
})
.collect::<Vec<Node>>())
})
.await?;
@@ -142,31 +156,6 @@ impl Persistence {
Ok(nodes)
}
pub(crate) async fn update_node(
&self,
input_node_id: NodeId,
input_scheduling: NodeSchedulingPolicy,
) -> DatabaseResult<()> {
use crate::schema::nodes::dsl::*;
let updated = self
.with_conn(move |conn| {
let updated = diesel::update(nodes)
.filter(node_id.eq(input_node_id.0 as i64))
.set((scheduling_policy.eq(String::from(input_scheduling)),))
.execute(conn)?;
Ok(updated)
})
.await?;
if updated != 1 {
Err(DatabaseError::Logical(format!(
"Node {node_id:?} not found for update",
)))
} else {
Ok(())
}
}
/// At startup, load the high level state for shards, such as their config + policy. This will
/// be enriched at runtime with state discovered on pageservers.
pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
@@ -517,7 +506,7 @@ pub(crate) struct TenantShardPersistence {
}
/// Parts of [`crate::node::Node`] that are stored durably
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable)]
#[diesel(table_name = crate::schema::nodes)]
pub(crate) struct NodePersistence {
pub(crate) node_id: i64,

View File

@@ -438,7 +438,7 @@ impl Reconciler {
match self.observed.locations.get(&node_id) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
// Nothing to do
tracing::info!(%node_id, "Observed configuration already correct.")
tracing::info!("Observed configuration already correct.")
}
_ => {
// In all cases other than a matching observed configuration, we will
@@ -449,7 +449,7 @@ impl Reconciler {
.increment_generation(self.tenant_shard_id, node_id)
.await?;
wanted_conf.generation = self.generation.into();
tracing::info!(%node_id, "Observed configuration requires update.");
tracing::info!("Observed configuration requires update.");
self.location_config(node_id, wanted_conf, None).await?;
self.compute_notify().await?;
}

View File

@@ -175,33 +175,6 @@ impl Scheduler {
}
}
/// Where we have several nodes to choose from, for example when picking a secondary location
/// to promote to an attached location, this method may be used to pick the best choice based
/// on the scheduler's knowledge of utilization and availability.
///
/// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
/// caller can pick a node some other way.
pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option<NodeId> {
if nodes.is_empty() {
return None;
}
let node = nodes
.iter()
.map(|node_id| {
let may_schedule = self
.nodes
.get(node_id)
.map(|n| n.may_schedule)
.unwrap_or(false);
(*node_id, may_schedule)
})
.max_by_key(|(_n, may_schedule)| *may_schedule);
// If even the preferred node has may_schedule==false, return None
node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
}
pub(crate) fn schedule_shard(&self, hard_exclude: &[NodeId]) -> Result<NodeId, ScheduleError> {
if self.nodes.is_empty() {
return Err(ScheduleError::NoPageservers);
@@ -251,45 +224,44 @@ impl Scheduler {
}
}
#[cfg(test)]
pub(crate) mod test_utils {
use crate::node::Node;
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
use std::collections::HashMap;
use utils::id::NodeId;
/// Test helper: synthesize the requested number of nodes, all in active state.
///
/// Node IDs start at one.
pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
(1..n + 1)
.map(|i| {
(
NodeId(i),
Node {
id: NodeId(i),
availability: NodeAvailability::Active,
scheduling: NodeSchedulingPolicy::Active,
listen_http_addr: format!("httphost-{i}"),
listen_http_port: 80 + i as u16,
listen_pg_addr: format!("pghost-{i}"),
listen_pg_port: 5432 + i as u16,
},
)
})
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
use utils::id::NodeId;
use crate::tenant_state::IntentState;
use crate::{node::Node, tenant_state::IntentState};
#[test]
fn scheduler_basic() -> anyhow::Result<()> {
let nodes = test_utils::make_test_nodes(2);
let mut nodes = HashMap::new();
nodes.insert(
NodeId(1),
Node {
id: NodeId(1),
availability: NodeAvailability::Active,
scheduling: NodeSchedulingPolicy::Active,
listen_http_addr: String::new(),
listen_http_port: 0,
listen_pg_addr: String::new(),
listen_pg_port: 0,
},
);
nodes.insert(
NodeId(2),
Node {
id: NodeId(2),
availability: NodeAvailability::Active,
scheduling: NodeSchedulingPolicy::Active,
listen_http_addr: String::new(),
listen_http_port: 0,
listen_pg_addr: String::new(),
listen_pg_port: 0,
},
);
let mut scheduler = Scheduler::new(nodes.values());
let mut t1_intent = IntentState::new();

View File

@@ -56,11 +56,6 @@ use crate::{
PlacementPolicy, Sequence,
};
// For operations that should be quick, like attaching a new tenant
const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
// For operations that might be slow, like migrating a tenant with
// some data in it.
const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
/// How long [`Service::startup_reconcile`] is allowed to take before it should give
@@ -484,8 +479,8 @@ impl Service {
async move {
if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await {
tracing::error!(
%tenant_shard_id,
%node_id,
tenant_shard_id=%tenant_shard_id,
node_id=%node_id,
"Failed to notify compute on startup for shard: {e}"
);
None
@@ -622,22 +617,7 @@ impl Service {
let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel();
tracing::info!("Loading nodes from database...");
let nodes = persistence
.list_nodes()
.await?
.into_iter()
.map(|n| Node {
id: NodeId(n.node_id as u64),
// At startup we consider a node offline until proven otherwise.
availability: NodeAvailability::Offline,
scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
.expect("Bad scheduling policy in DB"),
listen_http_addr: n.listen_http_addr,
listen_http_port: n.listen_http_port as u16,
listen_pg_addr: n.listen_pg_addr,
listen_pg_port: n.listen_pg_port as u16,
})
.collect::<Vec<_>>();
let nodes = persistence.list_nodes().await?;
let nodes: HashMap<NodeId, Node> = nodes.into_iter().map(|n| (n.id, n)).collect();
tracing::info!("Loaded {} nodes from database.", nodes.len());
@@ -929,15 +909,7 @@ impl Service {
pub(crate) async fn re_attach(
&self,
reattach_req: ReAttachRequest,
) -> Result<ReAttachResponse, ApiError> {
// Take a re-attach as indication that the node is available: this is a precursor to proper
// heartbeating in https://github.com/neondatabase/neon/issues/6844
self.node_configure(NodeConfigureRequest {
node_id: reattach_req.node_id,
availability: Some(NodeAvailability::Active),
scheduling: None,
})?;
) -> anyhow::Result<ReAttachResponse> {
// Ordering: we must persist generation number updates before making them visible in the in-memory state
let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?;
@@ -1028,16 +1000,6 @@ impl Service {
&self,
create_req: TenantCreateRequest,
) -> Result<TenantCreateResponse, ApiError> {
let (response, waiters) = self.do_tenant_create(create_req).await?;
self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await?;
Ok(response)
}
pub(crate) async fn do_tenant_create(
&self,
create_req: TenantCreateRequest,
) -> Result<(TenantCreateResponse, Vec<ReconcilerWaiter>), ApiError> {
// This service expects to handle sharding itself: it is an error to try and directly create
// a particular shard here.
let tenant_id = if !create_req.new_tenant_id.is_unsharded() {
@@ -1187,12 +1149,11 @@ impl Service {
(waiters, response_shards)
};
Ok((
TenantCreateResponse {
shards: response_shards,
},
waiters,
))
self.await_waiters(waiters).await?;
Ok(TenantCreateResponse {
shards: response_shards,
})
}
/// Helper for functions that reconcile a number of shards, and would like to do a timeout-bounded
@@ -1200,9 +1161,8 @@ impl Service {
async fn await_waiters(
&self,
waiters: Vec<ReconcilerWaiter>,
timeout: Duration,
) -> Result<(), ReconcileWaitError> {
let deadline = Instant::now().checked_add(timeout).unwrap();
let deadline = Instant::now().checked_add(Duration::from_secs(30)).unwrap();
for waiter in waiters {
let timeout = deadline.duration_since(Instant::now());
waiter.wait_timeout(timeout).await?;
@@ -1340,8 +1300,12 @@ impl Service {
}
};
let waiters = if let Some(create_req) = maybe_create {
let (create_resp, waiters) = self.do_tenant_create(create_req).await?;
// TODO: if we timeout/fail on reconcile, we should still succeed this request,
// because otherwise a broken compute hook causes a feedback loop where
// location_config returns 500 and gets retried forever.
if let Some(create_req) = maybe_create {
let create_resp = self.tenant_create(create_req).await?;
result.shards = create_resp
.shards
.into_iter()
@@ -1350,25 +1314,19 @@ impl Service {
shard_id: s.shard_id,
})
.collect();
waiters
} else {
waiters
};
if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await {
// Do not treat a reconcile error as fatal: we have already applied any requested
// Intent changes, and the reconcile can fail for external reasons like unavailable
// compute notification API. In these cases, it is important that we do not
// cause the cloud control plane to retry forever on this API.
tracing::warn!(
"Failed to reconcile after /location_config: {e}, returning success anyway"
);
// This was an update, wait for reconciliation
if let Err(e) = self.await_waiters(waiters).await {
// Do not treat a reconcile error as fatal: we have already applied any requested
// Intent changes, and the reconcile can fail for external reasons like unavailable
// compute notification API. In these cases, it is important that we do not
// cause the cloud control plane to retry forever on this API.
tracing::warn!(
"Failed to reconcile after /location_config: {e}, returning success anyway"
);
}
}
// Logging the full result is useful because it lets us cross-check what the cloud control
// plane's tenant_shards table should contain.
tracing::info!("Complete, returning {result:?}");
Ok(result)
}
@@ -2341,11 +2299,7 @@ impl Service {
.context("Scheduler checks")
.map_err(ApiError::InternalServerError)?;
let expect_nodes = locked
.nodes
.values()
.map(|n| n.to_persistent())
.collect::<Vec<_>>();
let expect_nodes = locked.nodes.values().cloned().collect::<Vec<_>>();
let expect_shards = locked
.tenants
@@ -2357,8 +2311,8 @@ impl Service {
};
let mut nodes = self.persistence.list_nodes().await?;
expect_nodes.sort_by_key(|n| n.node_id);
nodes.sort_by_key(|n| n.node_id);
expect_nodes.sort_by_key(|n| n.id);
nodes.sort_by_key(|n| n.id);
if nodes != expect_nodes {
tracing::error!("Consistency check failed on nodes.");
@@ -2372,9 +2326,6 @@ impl Service {
serde_json::to_string(&nodes)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Node consistency failure"
)));
}
let mut shards = self.persistence.list_tenant_shards().await?;
@@ -2385,17 +2336,14 @@ impl Service {
tracing::error!("Consistency check failed on shards.");
tracing::error!(
"Shards in memory: {}",
serde_json::to_string(&expect_shards)
serde_json::to_string(&expect_nodes)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
tracing::error!(
"Shards in database: {}",
serde_json::to_string(&shards)
serde_json::to_string(&nodes)
.map_err(|e| ApiError::InternalServerError(e.into()))?
);
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Shard consistency failure"
)));
}
Ok(())
@@ -2521,18 +2469,7 @@ impl Service {
Ok(())
}
pub(crate) async fn node_configure(
&self,
config_req: NodeConfigureRequest,
) -> Result<(), ApiError> {
if let Some(scheduling) = config_req.scheduling {
// Scheduling is a persistent part of Node: we must write updates to the database before
// applying them in memory
self.persistence
.update_node(config_req.node_id, scheduling)
.await?;
}
pub(crate) fn node_configure(&self, config_req: NodeConfigureRequest) -> Result<(), ApiError> {
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();

View File

@@ -143,23 +143,6 @@ impl IntentState {
}
}
/// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from
/// secondary to attached while maintaining the scheduler's reference counts.
pub(crate) fn promote_attached(
&mut self,
_scheduler: &mut Scheduler,
promote_secondary: NodeId,
) {
// If we call this with a node that isn't in secondary, it would cause incorrect
// scheduler reference counting, since we assume the node is already referenced as a secondary.
debug_assert!(self.secondary.contains(&promote_secondary));
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
// need to call into it here.
self.secondary.retain(|n| n != &promote_secondary);
self.attached = Some(promote_secondary);
}
pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
debug_assert!(!self.secondary.contains(&new_secondary));
scheduler.node_inc_ref(new_secondary);
@@ -214,8 +197,6 @@ impl IntentState {
/// Returns true if a change was made
pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
if self.attached == Some(node_id) {
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
// need to call into it here.
self.attached = None;
self.secondary.push(node_id);
true
@@ -389,9 +370,6 @@ impl TenantState {
// All remaining observed locations generate secondary intents. This includes None
// observations, as these may well have some local content on disk that is usable (this
// is an edge case that might occur if we restarted during a migration or other change)
//
// We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
// will take care of promoting one of these secondaries to be attached.
self.observed.locations.keys().for_each(|node_id| {
if Some(*node_id) != self.intent.attached {
self.intent.secondary.push(*node_id);
@@ -399,33 +377,6 @@ impl TenantState {
});
}
/// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
/// attached pageserver for a shard.
///
/// Returns whether we modified it, and the NodeId selected.
fn schedule_attached(
&mut self,
scheduler: &mut Scheduler,
) -> Result<(bool, NodeId), ScheduleError> {
// No work to do if we already have an attached tenant
if let Some(node_id) = self.intent.attached {
return Ok((false, node_id));
}
if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) {
// Promote a secondary
tracing::debug!("Promoted secondary {} to attached", promote_secondary);
self.intent.promote_attached(scheduler, promote_secondary);
Ok((true, promote_secondary))
} else {
// Pick a fresh node: either we had no secondaries or none were schedulable
let node_id = scheduler.schedule_shard(&self.intent.secondary)?;
tracing::debug!("Selected {} as attached", node_id);
self.intent.set_attached(scheduler, Some(node_id));
Ok((true, node_id))
}
}
pub(crate) fn schedule(&mut self, scheduler: &mut Scheduler) -> Result<(), ScheduleError> {
// TODO: before scheduling new nodes, check if any existing content in
// self.intent refers to pageservers that are offline, and pick other
@@ -436,15 +387,19 @@ impl TenantState {
// Build the set of pageservers already in use by this tenant, to avoid scheduling
// more work on the same pageservers we're already using.
let mut used_pageservers = self.intent.all_pageservers();
let mut modified = false;
use PlacementPolicy::*;
match self.policy {
Single => {
// Should have exactly one attached, and zero secondaries
let (modified_attached, _attached_node_id) = self.schedule_attached(scheduler)?;
modified |= modified_attached;
if self.intent.attached.is_none() {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
self.intent.set_attached(scheduler, Some(node_id));
used_pageservers.push(node_id);
modified = true;
}
if !self.intent.secondary.is_empty() {
self.intent.clear_secondary(scheduler);
modified = true;
@@ -452,10 +407,13 @@ impl TenantState {
}
Double(secondary_count) => {
// Should have exactly one attached, and N secondaries
let (modified_attached, attached_node_id) = self.schedule_attached(scheduler)?;
modified |= modified_attached;
if self.intent.attached.is_none() {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
self.intent.set_attached(scheduler, Some(node_id));
used_pageservers.push(node_id);
modified = true;
}
let mut used_pageservers = vec![attached_node_id];
while self.intent.secondary.len() < secondary_count {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
self.intent.push_secondary(scheduler, node_id);
@@ -737,95 +695,10 @@ impl TenantState {
shard_count: self.tenant_shard_id.shard_count.literal() as i32,
shard_stripe_size: self.shard.stripe_size.0 as i32,
generation: self.generation.into().unwrap_or(0) as i32,
generation_pageserver: self
.intent
.get_attached()
.map(|n| n.0 as i64)
.unwrap_or(i64::MAX),
generation_pageserver: i64::MAX,
placement_policy: serde_json::to_string(&self.policy).unwrap(),
config: serde_json::to_string(&self.config).unwrap(),
splitting: SplitState::default(),
}
}
}
#[cfg(test)]
pub(crate) mod tests {
use pageserver_api::shard::{ShardCount, ShardNumber};
use utils::id::TenantId;
use crate::scheduler::test_utils::make_test_nodes;
use super::*;
fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantState {
let tenant_id = TenantId::generate();
let shard_number = ShardNumber(0);
let shard_count = ShardCount::new(1);
let tenant_shard_id = TenantShardId {
tenant_id,
shard_number,
shard_count,
};
TenantState::new(
tenant_shard_id,
ShardIdentity::new(
shard_number,
shard_count,
pageserver_api::shard::ShardStripeSize(32768),
)
.unwrap(),
policy,
)
}
/// Test the scheduling behaviors used when a tenant configured for HA is subject
/// to nodes being marked offline.
#[test]
fn tenant_ha_scheduling() -> anyhow::Result<()> {
// Start with three nodes. Our tenant will only use two. The third one is
// expected to remain unused.
let mut nodes = make_test_nodes(3);
let mut scheduler = Scheduler::new(nodes.values());
let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Double(1));
tenant_state
.schedule(&mut scheduler)
.expect("we have enough nodes, scheduling should work");
// Expect to initially be schedule on to different nodes
assert_eq!(tenant_state.intent.secondary.len(), 1);
assert!(tenant_state.intent.attached.is_some());
let attached_node_id = tenant_state.intent.attached.unwrap();
let secondary_node_id = *tenant_state.intent.secondary.iter().last().unwrap();
assert_ne!(attached_node_id, secondary_node_id);
// Notifying the attached node is offline should demote it to a secondary
let changed = tenant_state.intent.notify_offline(attached_node_id);
assert!(changed);
// Update the scheduler state to indicate the node is offline
nodes.get_mut(&attached_node_id).unwrap().availability = NodeAvailability::Offline;
scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
// Scheduling the node should promote the still-available secondary node to attached
tenant_state
.schedule(&mut scheduler)
.expect("active nodes are available");
assert_eq!(tenant_state.intent.attached.unwrap(), secondary_node_id);
// The original attached node should have been retained as a secondary
assert_eq!(
*tenant_state.intent.secondary.iter().last().unwrap(),
attached_node_id
);
tenant_state.intent.clear(&mut scheduler);
Ok(())
}
}

View File

@@ -616,7 +616,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
let tenant_id = get_tenant_id(create_match, env)?;
let new_branch_name = create_match
.get_one::<String>("branch-name")
.ok_or_else(|| anyhow!("No branch name provided"))?;
.ok_or_else(|| anyhow!("No branch name provided"))?; // TODO
let pg_version = create_match
.get_one::<u32>("pg-version")

View File

@@ -210,25 +210,6 @@ impl PageServerNode {
update_config: bool,
register: bool,
) -> anyhow::Result<()> {
// Register the node with the storage controller before starting pageserver: pageserver must be registered to
// successfully call /re-attach and finish starting up.
if register {
let attachment_service = AttachmentService::from_env(&self.env);
let (pg_host, pg_port) =
parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr)
.expect("Unable to parse listen_http_addr");
attachment_service
.node_register(NodeRegisterRequest {
node_id: self.conf.id,
listen_pg_addr: pg_host.to_string(),
listen_pg_port: pg_port.unwrap_or(5432),
listen_http_addr: http_host.to_string(),
listen_http_port: http_port.unwrap_or(80),
})
.await?;
}
// TODO: using a thread here because start_process() is not async but we need to call check_status()
let datadir = self.repo_path();
print!(
@@ -267,6 +248,23 @@ impl PageServerNode {
)
.await?;
if register {
let attachment_service = AttachmentService::from_env(&self.env);
let (pg_host, pg_port) =
parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr)
.expect("Unable to parse listen_http_addr");
attachment_service
.node_register(NodeRegisterRequest {
node_id: self.conf.id,
listen_pg_addr: pg_host.to_string(),
listen_pg_port: pg_port.unwrap_or(5432),
listen_http_addr: http_host.to_string(),
listen_http_port: http_port.unwrap_or(80),
})
.await?;
}
Ok(())
}

View File

@@ -18,7 +18,6 @@ enum-map.workspace = true
strum.workspace = true
strum_macros.workspace = true
hex.workspace = true
humantime.workspace = true
thiserror.workspace = true
humantime-serde.workspace = true
chrono.workspace = true

View File

@@ -1,7 +1,4 @@
pub mod partitioning;
pub mod utilization;
pub use utilization::PageserverUtilization;
use std::{
collections::HashMap,
@@ -340,7 +337,7 @@ impl ThrottleConfig {
}
/// The requests per second allowed by the given config.
pub fn steady_rps(&self) -> f64 {
(self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64())
(self.refill_amount.get() as f64) / (self.refill_interval.as_secs_f64()) / 1e3
}
}

View File

@@ -1,70 +0,0 @@
use std::time::SystemTime;
/// Pageserver current utilization and scoring for how good candidate the pageserver would be for
/// the next tenant.
///
/// See and maintain pageserver openapi spec for `/v1/utilization_score` as the truth.
///
/// `format: int64` fields must use `ser_saturating_u63` because openapi generated clients might
/// not handle full u64 values properly.
#[derive(serde::Serialize, Debug)]
pub struct PageserverUtilization {
/// Used disk space
#[serde(serialize_with = "ser_saturating_u63")]
pub disk_usage_bytes: u64,
/// Free disk space
#[serde(serialize_with = "ser_saturating_u63")]
pub free_space_bytes: u64,
/// Lower is better score for how good candidate for a next tenant would this pageserver be.
#[serde(serialize_with = "ser_saturating_u63")]
pub utilization_score: u64,
/// When was this snapshot captured, pageserver local time.
///
/// Use millis to give confidence that the value is regenerated often enough.
#[serde(serialize_with = "ser_rfc3339_millis")]
pub captured_at: SystemTime,
}
fn ser_rfc3339_millis<S: serde::Serializer>(
ts: &SystemTime,
serializer: S,
) -> Result<S::Ok, S::Error> {
serializer.collect_str(&humantime::format_rfc3339_millis(*ts))
}
/// openapi knows only `format: int64`, so avoid outputting a non-parseable value by generated clients.
///
/// Instead of newtype, use this because a newtype would get require handling deserializing values
/// with the highest bit set which is properly parsed by serde formats, but would create a
/// conundrum on how to handle and again serialize such values at type level. It will be a few
/// years until we can use more than `i64::MAX` bytes on a disk.
fn ser_saturating_u63<S: serde::Serializer>(value: &u64, serializer: S) -> Result<S::Ok, S::Error> {
const MAX_FORMAT_INT64: u64 = i64::MAX as u64;
let value = (*value).min(MAX_FORMAT_INT64);
serializer.serialize_u64(value)
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
#[test]
fn u64_max_is_serialized_as_u63_max() {
let doc = PageserverUtilization {
disk_usage_bytes: u64::MAX,
free_space_bytes: 0,
utilization_score: u64::MAX,
captured_at: SystemTime::UNIX_EPOCH + Duration::from_secs(1708509779),
};
let s = serde_json::to_string(&doc).unwrap();
let expected = r#"{"disk_usage_bytes":9223372036854775807,"free_space_bytes":0,"utilization_score":9223372036854775807,"captured_at":"2024-02-21T10:02:59.000Z"}"#;
assert_eq!(s, expected);
}
}

View File

@@ -1379,25 +1379,6 @@ paths:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/utilization:
get:
description: |
Returns the pageservers current utilization and fitness score for new tenants.
responses:
"200":
description: Pageserver utilization and fitness score
content:
application/json:
schema:
$ref: "#/components/schemas/PageserverUtilization"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
components:
securitySchemes:
JWT:
@@ -1710,33 +1691,6 @@ components:
type: string
enum: [past, present, future, nodata]
PageserverUtilization:
type: object
required:
- disk_usage_bytes
- free_space_bytes
- utilization_score
properties:
disk_usage_bytes:
type: integer
format: int64
minimum: 0
description: The amount of disk space currently utilized by layer files.
free_space_bytes:
type: integer
format: int64
minimum: 0
description: The amount of usable disk space left.
utilization_score:
type: integer
format: int64
minimum: 0
maximum: 9223372036854775807
default: 9223372036854775807
description: |
Lower is better score for how good this pageserver would be for the next tenant.
The default or maximum value can be returned in situations when a proper score cannot (yet) be calculated.
Error:
type: object
required:

View File

@@ -100,7 +100,6 @@ pub struct State {
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
latest_utilization: tokio::sync::Mutex<Option<(std::time::Instant, bytes::Bytes)>>,
}
impl State {
@@ -129,7 +128,6 @@ impl State {
disk_usage_eviction_state,
deletion_queue_client,
secondary_controller,
latest_utilization: Default::default(),
})
}
}
@@ -1965,54 +1963,6 @@ async fn put_io_engine_handler(
json_response(StatusCode::OK, ())
}
/// Polled by control plane.
///
/// See [`crate::utilization`].
async fn get_utilization(
r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
// this probably could be completely public, but lets make that change later.
check_permission(&r, None)?;
let state = get_state(&r);
let mut g = state.latest_utilization.lock().await;
let regenerate_every = Duration::from_secs(1);
let still_valid = g
.as_ref()
.is_some_and(|(captured_at, _)| captured_at.elapsed() < regenerate_every);
// avoid needless statvfs calls even though those should be non-blocking fast.
// regenerate at most 1Hz to allow polling at any rate.
if !still_valid {
let path = state.conf.tenants_path();
let doc = crate::utilization::regenerate(path.as_std_path())
.map_err(ApiError::InternalServerError)?;
let mut buf = Vec::new();
serde_json::to_writer(&mut buf, &doc)
.context("serialize")
.map_err(ApiError::InternalServerError)?;
let body = bytes::Bytes::from(buf);
*g = Some((std::time::Instant::now(), body));
}
// hyper 0.14 doesn't yet have Response::clone so this is a bit of extra legwork
let cached = g.as_ref().expect("just set").1.clone();
Response::builder()
.header(hyper::http::header::CONTENT_TYPE, "application/json")
// thought of using http date header, but that is second precision which does not give any
// debugging aid
.status(StatusCode::OK)
.body(hyper::Body::from(cached))
.context("build response")
.map_err(ApiError::InternalServerError)
}
/// Common functionality of all the HTTP API handlers.
///
/// - Adds a tracing span to each request (by `request_span`)
@@ -2274,6 +2224,5 @@ pub fn make_router(
|r| api_handler(r, timeline_collect_keyspace),
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.get("/v1/utilization", |r| api_handler(r, get_utilization))
.any(handler_404))
}

View File

@@ -22,7 +22,6 @@ pub(crate) mod statvfs;
pub mod task_mgr;
pub mod tenant;
pub mod trace;
pub mod utilization;
pub mod virtual_file;
pub mod walingest;
pub mod walrecord;

View File

@@ -36,6 +36,8 @@ use tracing::{debug, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::{bin_ser::BeSer, lsn::Lsn};
const MAX_AUX_FILE_DELTAS: usize = 1024;
#[derive(Debug)]
pub enum LsnForTimestamp {
/// Found commits both before and after the given timestamp
@@ -1403,16 +1405,20 @@ impl<'a> DatadirModification<'a> {
let dir = if let Some(mut dir) = self.pending_aux_files.take() {
// We already updated aux files in `self`: emit a delta and update our latest value
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile {
file_path: file_path.clone(),
content: content.clone(),
}),
);
dir.upsert(file_path, content);
dir.upsert(file_path.clone(), content.clone());
if dir.files.len() % MAX_AUX_FILE_DELTAS == 0 {
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
} else {
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
);
}
dir
} else {
// Check if the AUX_FILES_KEY is initialized

View File

@@ -37,7 +37,6 @@ use crate::tenant::{
remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
};
use camino::Utf8PathBuf;
use chrono::format::{DelayedFormat, StrftimeItems};
use futures::Future;
use pageserver_api::shard::TenantShardId;
@@ -779,32 +778,19 @@ async fn init_timeline_state(
.await
.fatal_err(&format!("Listing {timeline_path}"))
{
let Ok(file_path) = Utf8PathBuf::from_path_buf(dentry.path()) else {
tracing::warn!("Malformed filename at {}", dentry.path().to_string_lossy());
continue;
};
let local_meta = dentry
.metadata()
.await
.fatal_err(&format!("Read metadata on {}", file_path));
let dentry_file_name = dentry.file_name();
let file_name = dentry_file_name.to_string_lossy();
let local_meta = dentry.metadata().await.fatal_err(&format!(
"Read metadata on {}",
dentry.path().to_string_lossy()
));
let file_name = file_path.file_name().expect("created it from the dentry");
// Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
if file_name == METADATA_FILE_NAME {
// Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
continue;
} else if crate::is_temporary(&file_path) {
// Temporary files are frequently left behind from restarting during downloads
tracing::info!("Cleaning up temporary file {file_path}");
if let Err(e) = tokio::fs::remove_file(&file_path)
.await
.or_else(fs_ext::ignore_not_found)
{
tracing::error!("Failed to remove temporary file {file_path}: {e}");
}
continue;
}
match LayerFileName::from_str(file_name) {
match LayerFileName::from_str(&file_name) {
Ok(name) => {
let remote_meta = heatmap_metadata.get(&name);
match remote_meta {

View File

@@ -1,38 +0,0 @@
//! An utilization metric which is used to decide on which pageserver to put next tenant.
//!
//! The metric is exposed via `GET /v1/utilization`. Refer and maintain it's openapi spec as the
//! truth.
use anyhow::Context;
use std::path::Path;
use pageserver_api::models::PageserverUtilization;
pub(crate) fn regenerate(tenants_path: &Path) -> anyhow::Result<PageserverUtilization> {
// TODO: currently the http api ratelimits this to 1Hz at most, which is probably good enough
let statvfs = nix::sys::statvfs::statvfs(tenants_path)
.map_err(std::io::Error::from)
.context("statvfs tenants directory")?;
let blocksz = statvfs.block_size();
#[cfg_attr(not(target_os = "macos"), allow(clippy::unnecessary_cast))]
let free = statvfs.blocks_available() as u64 * blocksz;
let used = crate::metrics::RESIDENT_PHYSICAL_SIZE_GLOBAL.get();
let captured_at = std::time::SystemTime::now();
let doc = PageserverUtilization {
disk_usage_bytes: used,
free_space_bytes: free,
// lower is better; start with a constant
//
// note that u64::MAX will be output as i64::MAX as u64, but that should not matter
utilization_score: u64::MAX,
captured_at,
};
// TODO: make utilization_score into a metric
Ok(doc)
}

View File

@@ -91,7 +91,6 @@ impl PostgresRedoManager {
if rec_neon != batch_neon {
let result = if batch_neon {
self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
.await
} else {
self.apply_batch_postgres(
key,
@@ -112,7 +111,6 @@ impl PostgresRedoManager {
// last batch
if batch_neon {
self.apply_batch_neon(key, lsn, img, &records[batch_start..])
.await
} else {
self.apply_batch_postgres(
key,
@@ -316,7 +314,7 @@ impl PostgresRedoManager {
///
/// Process a batch of WAL records using bespoken Neon code.
///
async fn apply_batch_neon(
fn apply_batch_neon(
&self,
key: Key,
lsn: Lsn,
@@ -334,17 +332,9 @@ impl PostgresRedoManager {
anyhow::bail!("invalid neon WAL redo request with no base image");
}
// process the records in batches and yield; this should guard against pathological
// situations where we accidentially have a huge number of in-neon applied records.
let yield_every = 200;
for records in records.chunks(yield_every) {
// Apply all the WAL records in the batch
for (record_lsn, record) in records {
self.apply_record_neon(key, &mut page, *record_lsn, record)?;
}
tokio::task::yield_now().await;
// Apply all the WAL records in the batch
for (record_lsn, record) in records.iter() {
self.apply_record_neon(key, &mut page, *record_lsn, record)?;
}
// Success!
let duration = start_time.elapsed();

77
poetry.lock generated
View File

@@ -858,43 +858,43 @@ files = [
[[package]]
name = "cryptography"
version = "42.0.4"
version = "42.0.2"
description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
optional = false
python-versions = ">=3.7"
files = [
{file = "cryptography-42.0.4-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:ffc73996c4fca3d2b6c1c8c12bfd3ad00def8621da24f547626bf06441400449"},
{file = "cryptography-42.0.4-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:db4b65b02f59035037fde0998974d84244a64c3265bdef32a827ab9b63d61b18"},
{file = "cryptography-42.0.4-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:dad9c385ba8ee025bb0d856714f71d7840020fe176ae0229de618f14dae7a6e2"},
{file = "cryptography-42.0.4-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:69b22ab6506a3fe483d67d1ed878e1602bdd5912a134e6202c1ec672233241c1"},
{file = "cryptography-42.0.4-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:e09469a2cec88fb7b078e16d4adec594414397e8879a4341c6ace96013463d5b"},
{file = "cryptography-42.0.4-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:3e970a2119507d0b104f0a8e281521ad28fc26f2820687b3436b8c9a5fcf20d1"},
{file = "cryptography-42.0.4-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:e53dc41cda40b248ebc40b83b31516487f7db95ab8ceac1f042626bc43a2f992"},
{file = "cryptography-42.0.4-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:c3a5cbc620e1e17009f30dd34cb0d85c987afd21c41a74352d1719be33380885"},
{file = "cryptography-42.0.4-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:6bfadd884e7280df24d26f2186e4e07556a05d37393b0f220a840b083dc6a824"},
{file = "cryptography-42.0.4-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:01911714117642a3f1792c7f376db572aadadbafcd8d75bb527166009c9f1d1b"},
{file = "cryptography-42.0.4-cp37-abi3-win32.whl", hash = "sha256:fb0cef872d8193e487fc6bdb08559c3aa41b659a7d9be48b2e10747f47863925"},
{file = "cryptography-42.0.4-cp37-abi3-win_amd64.whl", hash = "sha256:c1f25b252d2c87088abc8bbc4f1ecbf7c919e05508a7e8628e6875c40bc70923"},
{file = "cryptography-42.0.4-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:15a1fb843c48b4a604663fa30af60818cd28f895572386e5f9b8a665874c26e7"},
{file = "cryptography-42.0.4-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a1327f280c824ff7885bdeef8578f74690e9079267c1c8bd7dc5cc5aa065ae52"},
{file = "cryptography-42.0.4-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6ffb03d419edcab93b4b19c22ee80c007fb2d708429cecebf1dd3258956a563a"},
{file = "cryptography-42.0.4-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:1df6fcbf60560d2113b5ed90f072dc0b108d64750d4cbd46a21ec882c7aefce9"},
{file = "cryptography-42.0.4-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:44a64043f743485925d3bcac548d05df0f9bb445c5fcca6681889c7c3ab12764"},
{file = "cryptography-42.0.4-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:3c6048f217533d89f2f8f4f0fe3044bf0b2090453b7b73d0b77db47b80af8dff"},
{file = "cryptography-42.0.4-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:6d0fbe73728c44ca3a241eff9aefe6496ab2656d6e7a4ea2459865f2e8613257"},
{file = "cryptography-42.0.4-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:887623fe0d70f48ab3f5e4dbf234986b1329a64c066d719432d0698522749929"},
{file = "cryptography-42.0.4-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:ce8613beaffc7c14f091497346ef117c1798c202b01153a8cc7b8e2ebaaf41c0"},
{file = "cryptography-42.0.4-cp39-abi3-win32.whl", hash = "sha256:810bcf151caefc03e51a3d61e53335cd5c7316c0a105cc695f0959f2c638b129"},
{file = "cryptography-42.0.4-cp39-abi3-win_amd64.whl", hash = "sha256:a0298bdc6e98ca21382afe914c642620370ce0470a01e1bef6dd9b5354c36854"},
{file = "cryptography-42.0.4-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:5f8907fcf57392cd917892ae83708761c6ff3c37a8e835d7246ff0ad251d9298"},
{file = "cryptography-42.0.4-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:12d341bd42cdb7d4937b0cabbdf2a94f949413ac4504904d0cdbdce4a22cbf88"},
{file = "cryptography-42.0.4-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:1cdcdbd117681c88d717437ada72bdd5be9de117f96e3f4d50dab3f59fd9ab20"},
{file = "cryptography-42.0.4-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:0e89f7b84f421c56e7ff69f11c441ebda73b8a8e6488d322ef71746224c20fce"},
{file = "cryptography-42.0.4-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:f1e85a178384bf19e36779d91ff35c7617c885da487d689b05c1366f9933ad74"},
{file = "cryptography-42.0.4-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:d2a27aca5597c8a71abbe10209184e1a8e91c1fd470b5070a2ea60cafec35bcd"},
{file = "cryptography-42.0.4-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:4e36685cb634af55e0677d435d425043967ac2f3790ec652b2b88ad03b85c27b"},
{file = "cryptography-42.0.4-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:f47be41843200f7faec0683ad751e5ef11b9a56a220d57f300376cd8aba81660"},
{file = "cryptography-42.0.4.tar.gz", hash = "sha256:831a4b37accef30cccd34fcb916a5d7b5be3cbbe27268a02832c3e450aea39cb"},
{file = "cryptography-42.0.2-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:701171f825dcab90969596ce2af253143b93b08f1a716d4b2a9d2db5084ef7be"},
{file = "cryptography-42.0.2-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:61321672b3ac7aade25c40449ccedbc6db72c7f5f0fdf34def5e2f8b51ca530d"},
{file = "cryptography-42.0.2-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ea2c3ffb662fec8bbbfce5602e2c159ff097a4631d96235fcf0fb00e59e3ece4"},
{file = "cryptography-42.0.2-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3b15c678f27d66d247132cbf13df2f75255627bcc9b6a570f7d2fd08e8c081d2"},
{file = "cryptography-42.0.2-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:8e88bb9eafbf6a4014d55fb222e7360eef53e613215085e65a13290577394529"},
{file = "cryptography-42.0.2-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:a047682d324ba56e61b7ea7c7299d51e61fd3bca7dad2ccc39b72bd0118d60a1"},
{file = "cryptography-42.0.2-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:36d4b7c4be6411f58f60d9ce555a73df8406d484ba12a63549c88bd64f7967f1"},
{file = "cryptography-42.0.2-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:a00aee5d1b6c20620161984f8ab2ab69134466c51f58c052c11b076715e72929"},
{file = "cryptography-42.0.2-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:b97fe7d7991c25e6a31e5d5e795986b18fbbb3107b873d5f3ae6dc9a103278e9"},
{file = "cryptography-42.0.2-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:5fa82a26f92871eca593b53359c12ad7949772462f887c35edaf36f87953c0e2"},
{file = "cryptography-42.0.2-cp37-abi3-win32.whl", hash = "sha256:4b063d3413f853e056161eb0c7724822a9740ad3caa24b8424d776cebf98e7ee"},
{file = "cryptography-42.0.2-cp37-abi3-win_amd64.whl", hash = "sha256:841ec8af7a8491ac76ec5a9522226e287187a3107e12b7d686ad354bb78facee"},
{file = "cryptography-42.0.2-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:55d1580e2d7e17f45d19d3b12098e352f3a37fe86d380bf45846ef257054b242"},
{file = "cryptography-42.0.2-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:28cb2c41f131a5758d6ba6a0504150d644054fd9f3203a1e8e8d7ac3aea7f73a"},
{file = "cryptography-42.0.2-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b9097a208875fc7bbeb1286d0125d90bdfed961f61f214d3f5be62cd4ed8a446"},
{file = "cryptography-42.0.2-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:44c95c0e96b3cb628e8452ec060413a49002a247b2b9938989e23a2c8291fc90"},
{file = "cryptography-42.0.2-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:2f9f14185962e6a04ab32d1abe34eae8a9001569ee4edb64d2304bf0d65c53f3"},
{file = "cryptography-42.0.2-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:09a77e5b2e8ca732a19a90c5bca2d124621a1edb5438c5daa2d2738bfeb02589"},
{file = "cryptography-42.0.2-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:ad28cff53f60d99a928dfcf1e861e0b2ceb2bc1f08a074fdd601b314e1cc9e0a"},
{file = "cryptography-42.0.2-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:130c0f77022b2b9c99d8cebcdd834d81705f61c68e91ddd614ce74c657f8b3ea"},
{file = "cryptography-42.0.2-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:fa3dec4ba8fb6e662770b74f62f1a0c7d4e37e25b58b2bf2c1be4c95372b4a33"},
{file = "cryptography-42.0.2-cp39-abi3-win32.whl", hash = "sha256:3dbd37e14ce795b4af61b89b037d4bc157f2cb23e676fa16932185a04dfbf635"},
{file = "cryptography-42.0.2-cp39-abi3-win_amd64.whl", hash = "sha256:8a06641fb07d4e8f6c7dda4fc3f8871d327803ab6542e33831c7ccfdcb4d0ad6"},
{file = "cryptography-42.0.2-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:087887e55e0b9c8724cf05361357875adb5c20dec27e5816b653492980d20380"},
{file = "cryptography-42.0.2-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:a7ef8dd0bf2e1d0a27042b231a3baac6883cdd5557036f5e8df7139255feaac6"},
{file = "cryptography-42.0.2-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:4383b47f45b14459cab66048d384614019965ba6c1a1a141f11b5a551cace1b2"},
{file = "cryptography-42.0.2-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:fbeb725c9dc799a574518109336acccaf1303c30d45c075c665c0793c2f79a7f"},
{file = "cryptography-42.0.2-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:320948ab49883557a256eab46149df79435a22d2fefd6a66fe6946f1b9d9d008"},
{file = "cryptography-42.0.2-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:5ef9bc3d046ce83c4bbf4c25e1e0547b9c441c01d30922d812e887dc5f125c12"},
{file = "cryptography-42.0.2-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:52ed9ebf8ac602385126c9a2fe951db36f2cb0c2538d22971487f89d0de4065a"},
{file = "cryptography-42.0.2-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:141e2aa5ba100d3788c0ad7919b288f89d1fe015878b9659b307c9ef867d3a65"},
{file = "cryptography-42.0.2.tar.gz", hash = "sha256:e0ec52ba3c7f1b7d813cd52649a5b3ef1fc0d433219dc8c93827c57eab6cf888"},
]
[package.dependencies]
@@ -2182,7 +2182,6 @@ files = [
{file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"},
{file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"},
{file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"},
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"},
{file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"},
{file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"},
{file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"},
@@ -2572,16 +2571,6 @@ files = [
{file = "wrapt-1.14.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8ad85f7f4e20964db4daadcab70b47ab05c7c1cf2a7c1e51087bfaa83831854c"},
{file = "wrapt-1.14.1-cp310-cp310-win32.whl", hash = "sha256:a9a52172be0b5aae932bef82a79ec0a0ce87288c7d132946d645eba03f0ad8a8"},
{file = "wrapt-1.14.1-cp310-cp310-win_amd64.whl", hash = "sha256:6d323e1554b3d22cfc03cd3243b5bb815a51f5249fdcbb86fda4bf62bab9e164"},
{file = "wrapt-1.14.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ecee4132c6cd2ce5308e21672015ddfed1ff975ad0ac8d27168ea82e71413f55"},
{file = "wrapt-1.14.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2020f391008ef874c6d9e208b24f28e31bcb85ccff4f335f15a3251d222b92d9"},
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2feecf86e1f7a86517cab34ae6c2f081fd2d0dac860cb0c0ded96d799d20b335"},
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:240b1686f38ae665d1b15475966fe0472f78e71b1b4903c143a842659c8e4cb9"},
{file = "wrapt-1.14.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a9008dad07d71f68487c91e96579c8567c98ca4c3881b9b113bc7b33e9fd78b8"},
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:6447e9f3ba72f8e2b985a1da758767698efa72723d5b59accefd716e9e8272bf"},
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:acae32e13a4153809db37405f5eba5bac5fbe2e2ba61ab227926a22901051c0a"},
{file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:49ef582b7a1152ae2766557f0550a9fcbf7bbd76f43fbdc94dd3bf07cc7168be"},
{file = "wrapt-1.14.1-cp311-cp311-win32.whl", hash = "sha256:358fe87cc899c6bb0ddc185bf3dbfa4ba646f05b1b0b9b5a27c2cb92c2cea204"},
{file = "wrapt-1.14.1-cp311-cp311-win_amd64.whl", hash = "sha256:26046cd03936ae745a502abf44dac702a5e6880b2b01c29aea8ddf3353b68224"},
{file = "wrapt-1.14.1-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:43ca3bbbe97af00f49efb06e352eae40434ca9d915906f77def219b88e85d907"},
{file = "wrapt-1.14.1-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:6b1a564e6cb69922c7fe3a678b9f9a3c54e72b469875aa8018f18b4d1dd1adf3"},
{file = "wrapt-1.14.1-cp35-cp35m-manylinux2010_i686.whl", hash = "sha256:00b6d4ea20a906c0ca56d84f93065b398ab74b927a7a3dbd470f6fc503f95dc3"},

View File

@@ -168,11 +168,12 @@ impl CancelClosure {
cancel_token,
}
}
/// Cancels the query running on user's compute node.
pub async fn try_cancel_query(self) -> Result<(), CancelError> {
async fn try_cancel_query(self) -> Result<(), CancelError> {
let socket = TcpStream::connect(self.socket_addr).await?;
self.cancel_token.cancel_query_raw(socket, NoTls).await?;
info!("query was cancelled");
Ok(())
}
}

View File

@@ -1,5 +1,4 @@
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tracing::info;
use std::future::poll_fn;
use std::io;
@@ -40,51 +39,42 @@ where
}
}
#[tracing::instrument(skip_all)]
pub(super) async fn copy_bidirectional_client_compute<Client, Compute>(
client: &mut Client,
compute: &mut Compute,
pub(super) async fn copy_bidirectional<A, B>(
a: &mut A,
b: &mut B,
) -> Result<(u64, u64), std::io::Error>
where
Client: AsyncRead + AsyncWrite + Unpin + ?Sized,
Compute: AsyncRead + AsyncWrite + Unpin + ?Sized,
A: AsyncRead + AsyncWrite + Unpin + ?Sized,
B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
let mut client_to_compute = TransferState::Running(CopyBuffer::new());
let mut compute_to_client = TransferState::Running(CopyBuffer::new());
let mut a_to_b = TransferState::Running(CopyBuffer::new());
let mut b_to_a = TransferState::Running(CopyBuffer::new());
poll_fn(|cx| {
let mut client_to_compute_result =
transfer_one_direction(cx, &mut client_to_compute, client, compute)?;
let mut compute_to_client_result =
transfer_one_direction(cx, &mut compute_to_client, compute, client)?;
let mut a_to_b_result = transfer_one_direction(cx, &mut a_to_b, a, b)?;
let mut b_to_a_result = transfer_one_direction(cx, &mut b_to_a, b, a)?;
// Early termination checks from compute to client.
if let TransferState::Done(_) = compute_to_client {
if let TransferState::Running(buf) = &client_to_compute {
info!("Compute is done, terminate client");
// Early termination checks
if let TransferState::Done(_) = a_to_b {
if let TransferState::Running(buf) = &b_to_a {
// Initiate shutdown
client_to_compute = TransferState::ShuttingDown(buf.amt);
client_to_compute_result =
transfer_one_direction(cx, &mut client_to_compute, client, compute)?;
b_to_a = TransferState::ShuttingDown(buf.amt);
b_to_a_result = transfer_one_direction(cx, &mut b_to_a, b, a)?;
}
}
// Early termination checks from compute to client.
if let TransferState::Done(_) = client_to_compute {
if let TransferState::Running(buf) = &compute_to_client {
info!("Client is done, terminate compute");
if let TransferState::Done(_) = b_to_a {
if let TransferState::Running(buf) = &a_to_b {
// Initiate shutdown
compute_to_client = TransferState::ShuttingDown(buf.amt);
compute_to_client_result =
transfer_one_direction(cx, &mut compute_to_client, client, compute)?;
a_to_b = TransferState::ShuttingDown(buf.amt);
a_to_b_result = transfer_one_direction(cx, &mut a_to_b, a, b)?;
}
}
// It is not a problem if ready! returns early ... (comment remains the same)
let client_to_compute = ready!(client_to_compute_result);
let compute_to_client = ready!(compute_to_client_result);
let a_to_b = ready!(a_to_b_result);
let b_to_a = ready!(b_to_a_result);
Poll::Ready(Ok((client_to_compute, compute_to_client)))
Poll::Ready(Ok((a_to_b, b_to_a)))
})
.await
}
@@ -229,46 +219,38 @@ mod tests {
use tokio::io::AsyncWriteExt;
#[tokio::test]
async fn test_client_to_compute() {
let (mut client_client, mut client_proxy) = tokio::io::duplex(8); // Create a mock duplex stream
let (mut compute_proxy, mut compute_client) = tokio::io::duplex(32); // Create a mock duplex stream
async fn test_early_termination_a_to_d() {
let (mut a_mock, mut b_mock) = tokio::io::duplex(8); // Create a mock duplex stream
let (mut c_mock, mut d_mock) = tokio::io::duplex(32); // Create a mock duplex stream
// Simulate 'a' finishing while there's still data for 'b'
client_client.write_all(b"hello").await.unwrap();
client_client.shutdown().await.unwrap();
compute_client.write_all(b"Neon").await.unwrap();
compute_client.shutdown().await.unwrap();
a_mock.write_all(b"hello").await.unwrap();
a_mock.shutdown().await.unwrap();
d_mock.write_all(b"Neon Serverless Postgres").await.unwrap();
let result = copy_bidirectional_client_compute(&mut client_proxy, &mut compute_proxy)
.await
.unwrap();
let result = copy_bidirectional(&mut b_mock, &mut c_mock).await.unwrap();
// Assert correct transferred amounts
let (client_to_compute_count, compute_to_client_count) = result;
assert_eq!(client_to_compute_count, 5); // 'hello' was transferred
assert_eq!(compute_to_client_count, 4); // response only partially transferred or not at all
let (a_to_d_count, d_to_a_count) = result;
assert_eq!(a_to_d_count, 5); // 'hello' was transferred
assert!(d_to_a_count <= 8); // response only partially transferred or not at all
}
#[tokio::test]
async fn test_compute_to_client() {
let (mut client_client, mut client_proxy) = tokio::io::duplex(32); // Create a mock duplex stream
let (mut compute_proxy, mut compute_client) = tokio::io::duplex(8); // Create a mock duplex stream
async fn test_early_termination_d_to_a() {
let (mut a_mock, mut b_mock) = tokio::io::duplex(32); // Create a mock duplex stream
let (mut c_mock, mut d_mock) = tokio::io::duplex(8); // Create a mock duplex stream
// Simulate 'a' finishing while there's still data for 'b'
compute_client.write_all(b"hello").await.unwrap();
compute_client.shutdown().await.unwrap();
client_client
.write_all(b"Neon Serverless Postgres")
.await
.unwrap();
d_mock.write_all(b"hello").await.unwrap();
d_mock.shutdown().await.unwrap();
a_mock.write_all(b"Neon Serverless Postgres").await.unwrap();
let result = copy_bidirectional_client_compute(&mut client_proxy, &mut compute_proxy)
.await
.unwrap();
let result = copy_bidirectional(&mut b_mock, &mut c_mock).await.unwrap();
// Assert correct transferred amounts
let (client_to_compute_count, compute_to_client_count) = result;
assert_eq!(compute_to_client_count, 5); // 'hello' was transferred
assert!(client_to_compute_count <= 8); // response only partially transferred or not at all
let (a_to_d_count, d_to_a_count) = result;
assert_eq!(d_to_a_count, 5); // 'hello' was transferred
assert!(a_to_d_count <= 8); // response only partially transferred or not at all
}
}

View File

@@ -46,11 +46,7 @@ pub async fn proxy_pass(
// Starting from here we only proxy the client's traffic.
info!("performing the proxy pass...");
let _ = crate::proxy::copy_bidirectional::copy_bidirectional_client_compute(
&mut client,
&mut compute,
)
.await?;
let _ = crate::proxy::copy_bidirectional::copy_bidirectional(&mut client, &mut compute).await?;
Ok(())
}
@@ -67,8 +63,6 @@ pub struct ProxyPassthrough<S> {
impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {
pub async fn proxy_pass(self) -> anyhow::Result<()> {
let res = proxy_pass(self.client, self.compute.stream, self.aux).await;
self.compute.cancel_closure.try_cancel_query().await?;
res
proxy_pass(self.client, self.compute.stream, self.aux).await
}
}

View File

@@ -302,15 +302,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
)
self.verbose_error(res)
def tenant_list_locations(self):
res = self.get(
f"http://localhost:{self.port}/v1/location_config",
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json["tenant_shards"], list)
return res_json
def tenant_delete(self, tenant_id: Union[TenantId, TenantShardId]):
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
self.verbose_error(res)

View File

@@ -15,7 +15,7 @@ def test_migrations(neon_simple_env: NeonEnv):
endpoint.wait_for_migrations()
num_migrations = 6
num_migrations = 4
with endpoint.cursor() as cur:
cur.execute("SELECT id FROM neon_migration.migration_id")

View File

@@ -211,6 +211,7 @@ def test_ondemand_download_timetravel(neon_env_builder: NeonEnvBuilder):
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, timeline_id)
client.deletion_queue_flush(execute=True)
del current_lsn
env.pageserver.stop()
env.pageserver.start()
# We've shut down the SKs, then restarted the PSes to sever all walreceiver connections;

View File

@@ -235,6 +235,11 @@ def test_sharding_split_smoke(
all_shards = tenant_get_shards(env, tenant_id)
for tenant_shard_id, pageserver in all_shards:
pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None)
# Restart all nodes, to check that the newly created shards are durable
for ps in env.pageservers:
ps.restart()
workload.validate()
migrate_to_pageserver_ids = list(
@@ -283,32 +288,6 @@ def test_sharding_split_smoke(
env.attachment_service.consistency_check()
# Validate pageserver state
shards_exist: list[TenantShardId] = []
for pageserver in env.pageservers:
locations = pageserver.http_client().tenant_list_locations()
shards_exist.extend(TenantShardId.parse(s[0]) for s in locations["tenant_shards"])
log.info("Shards after split: {shards_exist}")
assert len(shards_exist) == split_shard_count
# Ensure post-split pageserver locations survive a restart (i.e. the child shards
# correctly wrote config to disk, and the storage controller responds correctly
# to /re-attach)
for pageserver in env.pageservers:
pageserver.stop()
pageserver.start()
shards_exist = []
for pageserver in env.pageservers:
locations = pageserver.http_client().tenant_list_locations()
shards_exist.extend(TenantShardId.parse(s[0]) for s in locations["tenant_shards"])
log.info("Shards after restart: {shards_exist}")
assert len(shards_exist) == split_shard_count
workload.validate()
@pytest.mark.skipif(
# The quantity of data isn't huge, but debug can be _very_ slow, and the things we're

View File

@@ -125,20 +125,6 @@ def test_sharding_service_smoke(
time.sleep(1)
assert get_node_shard_counts(env, tenant_ids)[env.pageservers[0].id] == 0
# Restarting a pageserver should not detach any tenants (i.e. /re-attach works)
before_restart = env.pageservers[1].http_client().tenant_list_locations()
env.pageservers[1].stop()
env.pageservers[1].start()
after_restart = env.pageservers[1].http_client().tenant_list_locations()
assert len(after_restart) == len(before_restart)
# Locations should be the same before & after restart, apart from generations
for _shard_id, tenant in after_restart["tenant_shards"]:
del tenant["generation"]
for _shard_id, tenant in before_restart["tenant_shards"]:
del tenant["generation"]
assert before_restart == after_restart
# Delete all the tenants
for tid in tenant_ids:
tenant_delete_wait_completed(env.attachment_service.pageserver_api(), tid, 10)
@@ -272,13 +258,8 @@ def test_sharding_service_onboarding(
env.broker.try_start()
env.attachment_service.start()
# This is the pageserver where we'll initially create the tenant. Run it in emergency
# mode so that it doesn't talk to storage controller, and do not register it.
env.pageservers[0].allowed_errors.append(".*Emergency mode!.*")
env.pageservers[0].start(
overrides=("--pageserver-config-override=control_plane_emergency_mode=true",),
register=False,
)
# This is the pageserver where we'll initially create the tenant
env.pageservers[0].start(register=False)
origin_ps = env.pageservers[0]
# This is the pageserver managed by the sharding service, where the tenant

View File

@@ -1,5 +1,5 @@
{
"postgres-v16": "9c37a4988463a97d9cacb321acf3828b09823269",
"postgres-v15": "ca2def999368d9df098a637234ad5a9003189463",
"postgres-v14": "9dd9956c55ffbbd9abe77d10382453757fedfcf5"
"postgres-v16": "dc40299045a377ec3b302c900134468a1b0f58ee",
"postgres-v15": "0baccce15a3b0446af5c403d2e869a04541b63c4",
"postgres-v14": "17101190de8a54b95e0831c66c3da426ed33db34"
}