Compare commits

..

9 Commits

Author SHA1 Message Date
Arseny Sher
5598f712fa Fix LR tests waiting for synced data.
Even if pg_stat_subscription.latest_end_lsn is caughtup, some tables might not
be synced because until sync worker finishes main apply worker continues to
advance.
2024-03-13 13:42:49 +03:00
Conrad Ludgate
83855a907c proxy http error classification (#7098)
## Problem

Missing error classification for SQL-over-HTTP queries.
Not respecting `UserFacingError` for SQL-over-HTTP queries.

## Summary of changes

Adds error classification.
Adds user facing errors.
2024-03-13 07:35:49 +01:00
John Spray
1b41db8bdd pageserver: enable setting stripe size inline with split request. (#7093)
## Summary

- Currently we can set stripe size at tenant creation, but it doesn't
mean anything until we have multiple shards
- When onboarding an existing tenant, it will always get a default shard
stripe size, so we would like to be able to pick the actual stripe size
at the point we split.

## Why do this inline with a split?

The alternative to this change would be to have a separate endpoint on
the storage controller for setting the stripe size on a tenant, and only
permit writes to that endpoint when the tenant has only a single shard.
That would work, but be a little bit more work for a client, and not
appreciably simpler (instead of having a special argument to the split
functions, we'd have a special separate endpoint, and a requirement that
the controller must sync its config down to the pageserver before
calling the split API). Either approach would work, but this one feels a
bit more robust end-to-end: the split API is the _very last moment_ that
the stripe size is mutable, so if we aim to set it before splitting, it
makes sense to do it as part of the same operation.
2024-03-12 20:41:08 +00:00
Jure Bajic
bac06ea1ac pageserver: fix read path max lsn bug (#7007)
## Summary of changes
The problem it fixes is when `request_lsn` is `u64::MAX-1` the
`cont_lsn` becomes `u64::MAX` which is the same as `prev_lsn` which
stops the loop.

Closes https://github.com/neondatabase/neon/issues/6812
2024-03-12 16:32:47 +00:00
John Spray
7ae8364b0b storage controller: register nodes in re-attach request (#7040)
## Problem

Currently we manually register nodes with the storage controller, and
use a script during deploy to register with the cloud control plane.
Rather than extend that script further, nodes should just register on
startup.

## Summary of changes

- Extend the re-attach request to include an optional
NodeRegisterRequest
- If the `register` field is set, handle it like a normal node
registration before executing the normal re-attach work.
- Update tests/neon_local that used to rely on doing an explicit
register step that could be enabled/disabled.

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
2024-03-12 14:47:12 +00:00
Conrad Ludgate
1f7d54f987 proxy refactor tls listener (#7056)
## Problem

Now that we have tls-listener vendored, we can refactor and remove a lot
of bloated code and make the whole flow a bit simpler

## Summary of changes

1. Remove dead code
2. Move the error handling to inside the `TlsListener` accept() function
3. Extract the peer_addr from the PROXY protocol header and log it with
errors
2024-03-12 13:05:40 +00:00
Arthur Petukhovsky
580e136b2e Forward all backpressure feedback to compute (#7079)
Previously we aggregated ps_feedback on each safekeeper and sent it to
walproposer with every AppendResponse. This PR changes it to send
ps_feedback to walproposer right after receiving it from pageserver,
without aggregating it in memory. Also contains some preparations for
implementing backpressure support for sharding.
2024-03-12 12:14:02 +00:00
Conrad Ludgate
09699d4bd8 proxy: cancel http queries on timeout (#7031)
## Problem

On HTTP query timeout, we should try and cancel the current in-flight
SQL query.

## Summary of changes

Trigger a cancellation command in postgres once the timeout is reach
2024-03-12 11:52:00 +00:00
John Spray
89cf714890 tests/neon_local: rename "attachment service" -> "storage controller" (#7087)
Not a user-facing change, but can break any existing `.neon` directories
created by neon_local, as the name of the database used by the storage
controller changes.

This PR changes all the locations apart from the path of
`control_plane/attachment_service` (waiting for an opportune moment to
do that one, because it's the most conflict-ish wrt ongoing PRs like
#6676 )
2024-03-12 11:36:27 +00:00
57 changed files with 1313 additions and 899 deletions

View File

@@ -51,7 +51,7 @@ CARGO_BUILD_FLAGS += $(filter -j1,$(MAKEFLAGS))
CARGO_CMD_PREFIX += $(if $(filter n,$(MAKEFLAGS)),,+)
# Force cargo not to print progress bar
CARGO_CMD_PREFIX += CARGO_TERM_PROGRESS_WHEN=never CI=1
# Set PQ_LIB_DIR to make sure `attachment_service` get linked with bundled libpq (through diesel)
# Set PQ_LIB_DIR to make sure `storage_controller` get linked with bundled libpq (through diesel)
CARGO_CMD_PREFIX += PQ_LIB_DIR=$(POSTGRES_INSTALL_DIR)/v16/lib
#

View File

@@ -30,7 +30,7 @@ use pageserver_api::controller_api::{
};
use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
use control_plane::attachment_service::{AttachHookRequest, InspectRequest};
use control_plane::storage_controller::{AttachHookRequest, InspectRequest};
/// State available to HTTP request handlers
#[derive(Clone)]

View File

@@ -1,9 +1,3 @@
/// The attachment service mimics the aspects of the control plane API
/// that are required for a pageserver to operate.
///
/// This enables running & testing pageservers without a full-blown
/// deployment of the Neon cloud platform.
///
use anyhow::{anyhow, Context};
use attachment_service::http::make_router;
use attachment_service::metrics::preinitialize_metrics;

View File

@@ -20,7 +20,7 @@ use crate::node::Node;
/// ## What do we store?
///
/// The attachment service does not store most of its state durably.
/// The storage controller service does not store most of its state durably.
///
/// The essential things to store durably are:
/// - generation numbers, as these must always advance monotonically to ensure data safety.
@@ -34,7 +34,7 @@ use crate::node::Node;
///
/// ## Performance/efficiency
///
/// The attachment service does not go via the database for most things: there are
/// The storage controller service does not go via the database for most things: there are
/// a couple of places where we must, and where efficiency matters:
/// - Incrementing generation numbers: the Reconciler has to wait for this to complete
/// before it can attach a tenant, so this acts as a bound on how fast things like

View File

@@ -8,7 +8,7 @@ use std::{
};
use anyhow::Context;
use control_plane::attachment_service::{
use control_plane::storage_controller::{
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
};
use diesel::result::DatabaseErrorKind;
@@ -839,7 +839,7 @@ impl Service {
tenant_state.generation = Some(new_generation);
} else {
// This is a detach notification. We must update placement policy to avoid re-attaching
// during background scheduling/reconciliation, or during attachment service restart.
// during background scheduling/reconciliation, or during storage controller restart.
assert!(attach_req.node_id.is_none());
tenant_state.policy = PlacementPolicy::Detached;
}
@@ -922,6 +922,10 @@ impl Service {
&self,
reattach_req: ReAttachRequest,
) -> Result<ReAttachResponse, ApiError> {
if let Some(register_req) = reattach_req.register {
self.node_register(register_req).await?;
}
// Take a re-attach as indication that the node is available: this is a precursor to proper
// heartbeating in https://github.com/neondatabase/neon/issues/6844
self.node_configure(NodeConfigureRequest {
@@ -2218,7 +2222,18 @@ impl Service {
// unwrap safety: we would have returned above if we didn't find at least one shard to split
let old_shard_count = old_shard_count.unwrap();
let shard_ident = shard_ident.unwrap();
let shard_ident = if let Some(new_stripe_size) = split_req.new_stripe_size {
// This ShardIdentity will be used as the template for all children, so this implicitly
// applies the new stripe size to the children.
let mut shard_ident = shard_ident.unwrap();
if shard_ident.count.count() > 1 && shard_ident.stripe_size != new_stripe_size {
return Err(ApiError::BadRequest(anyhow::anyhow!("Attempted to change stripe size ({:?}->{new_stripe_size:?}) on a tenant with multiple shards", shard_ident.stripe_size)));
}
shard_ident.stripe_size = new_stripe_size;
shard_ident
} else {
shard_ident.unwrap()
};
let policy = policy.unwrap();
// FIXME: we have dropped self.inner lock, and not yet written anything to the database: another
@@ -2310,6 +2325,7 @@ impl Service {
*parent_id,
TenantShardSplitRequest {
new_shard_count: split_req.new_shard_count,
new_stripe_size: split_req.new_stripe_size,
},
)
.await

View File

@@ -8,11 +8,11 @@
use anyhow::{anyhow, bail, Context, Result};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command, ValueEnum};
use compute_api::spec::ComputeMode;
use control_plane::attachment_service::AttachmentService;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::local_env::{InitForceMode, LocalEnv};
use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR};
use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage_controller::StorageController;
use control_plane::{broker, local_env};
use pageserver_api::controller_api::{
NodeAvailability, NodeConfigureRequest, NodeSchedulingPolicy, PlacementPolicy,
@@ -138,7 +138,7 @@ fn main() -> Result<()> {
"start" => rt.block_on(handle_start_all(sub_args, &env)),
"stop" => rt.block_on(handle_stop_all(sub_args, &env)),
"pageserver" => rt.block_on(handle_pageserver(sub_args, &env)),
"attachment_service" => rt.block_on(handle_attachment_service(sub_args, &env)),
"storage_controller" => rt.block_on(handle_storage_controller(sub_args, &env)),
"safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)),
"endpoint" => rt.block_on(handle_endpoint(sub_args, &env)),
"mappings" => handle_mappings(sub_args, &mut env),
@@ -445,14 +445,14 @@ async fn handle_tenant(
// If tenant ID was not specified, generate one
let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate);
// We must register the tenant with the attachment service, so
// We must register the tenant with the storage controller, so
// that when the pageserver restarts, it will be re-attached.
let attachment_service = AttachmentService::from_env(env);
attachment_service
let storage_controller = StorageController::from_env(env);
storage_controller
.tenant_create(TenantCreateRequest {
// Note that ::unsharded here isn't actually because the tenant is unsharded, its because the
// attachment service expecfs a shard-naive tenant_id in this attribute, and the TenantCreateRequest
// type is used both in attachment service (for creating tenants) and in pageserver (for creating shards)
// storage controller expecfs a shard-naive tenant_id in this attribute, and the TenantCreateRequest
// type is used both in storage controller (for creating tenants) and in pageserver (for creating shards)
new_tenant_id: TenantShardId::unsharded(tenant_id),
generation: None,
shard_parameters: ShardParameters {
@@ -476,9 +476,9 @@ async fn handle_tenant(
.context("Failed to parse postgres version from the argument string")?;
// FIXME: passing None for ancestor_start_lsn is not kosher in a sharded world: we can't have
// different shards picking different start lsns. Maybe we have to teach attachment service
// different shards picking different start lsns. Maybe we have to teach storage controller
// to let shard 0 branch first and then propagate the chosen LSN to other shards.
attachment_service
storage_controller
.tenant_timeline_create(
tenant_id,
TimelineCreateRequest {
@@ -528,8 +528,8 @@ async fn handle_tenant(
let new_pageserver = get_pageserver(env, matches)?;
let new_pageserver_id = new_pageserver.conf.id;
let attachment_service = AttachmentService::from_env(env);
attachment_service
let storage_controller = StorageController::from_env(env);
storage_controller
.tenant_migrate(tenant_shard_id, new_pageserver_id)
.await?;
@@ -543,8 +543,8 @@ async fn handle_tenant(
let mut tenant_synthetic_size = None;
let attachment_service = AttachmentService::from_env(env);
for shard in attachment_service.tenant_locate(tenant_id).await?.shards {
let storage_controller = StorageController::from_env(env);
for shard in storage_controller.tenant_locate(tenant_id).await?.shards {
let pageserver =
PageServerNode::from_env(env, env.get_pageserver_conf(shard.node_id)?);
@@ -585,10 +585,14 @@ async fn handle_tenant(
Some(("shard-split", matches)) => {
let tenant_id = get_tenant_id(matches, env)?;
let shard_count: u8 = matches.get_one::<u8>("shard-count").cloned().unwrap_or(0);
let shard_stripe_size: Option<ShardStripeSize> = matches
.get_one::<Option<ShardStripeSize>>("shard-stripe-size")
.cloned()
.unwrap();
let attachment_service = AttachmentService::from_env(env);
let result = attachment_service
.tenant_split(tenant_id, shard_count)
let storage_controller = StorageController::from_env(env);
let result = storage_controller
.tenant_split(tenant_id, shard_count, shard_stripe_size)
.await?;
println!(
"Split tenant {} into shards {}",
@@ -613,7 +617,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
match timeline_match.subcommand() {
Some(("list", list_match)) => {
// TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the attachment service
// TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
// where shard 0 is attached, and query there.
let tenant_shard_id = get_tenant_shard_id(list_match, env)?;
let timelines = pageserver.timeline_list(&tenant_shard_id).await?;
@@ -633,7 +637,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
let new_timeline_id_opt = parse_timeline_id(create_match)?;
let new_timeline_id = new_timeline_id_opt.unwrap_or(TimelineId::generate());
let attachment_service = AttachmentService::from_env(env);
let storage_controller = StorageController::from_env(env);
let create_req = TimelineCreateRequest {
new_timeline_id,
ancestor_timeline_id: None,
@@ -641,7 +645,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
ancestor_start_lsn: None,
pg_version: Some(pg_version),
};
let timeline_info = attachment_service
let timeline_info = storage_controller
.tenant_timeline_create(tenant_id, create_req)
.await?;
@@ -730,7 +734,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
.transpose()
.context("Failed to parse ancestor start Lsn from the request")?;
let new_timeline_id = TimelineId::generate();
let attachment_service = AttachmentService::from_env(env);
let storage_controller = StorageController::from_env(env);
let create_req = TimelineCreateRequest {
new_timeline_id,
ancestor_timeline_id: Some(ancestor_timeline_id),
@@ -738,7 +742,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
ancestor_start_lsn: start_lsn,
pg_version: None,
};
let timeline_info = attachment_service
let timeline_info = storage_controller
.tenant_timeline_create(tenant_id, create_req)
.await?;
@@ -767,7 +771,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
match sub_name {
"list" => {
// TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the attachment service
// TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
// where shard 0 is attached, and query there.
let tenant_shard_id = get_tenant_shard_id(sub_args, env)?;
let timeline_infos = get_timeline_infos(env, &tenant_shard_id)
@@ -952,21 +956,21 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
(
vec![(parsed.0, parsed.1.unwrap_or(5432))],
// If caller is telling us what pageserver to use, this is not a tenant which is
// full managed by attachment service, therefore not sharded.
// full managed by storage controller, therefore not sharded.
ShardParameters::DEFAULT_STRIPE_SIZE,
)
} else {
// Look up the currently attached location of the tenant, and its striping metadata,
// to pass these on to postgres.
let attachment_service = AttachmentService::from_env(env);
let locate_result = attachment_service.tenant_locate(endpoint.tenant_id).await?;
let storage_controller = StorageController::from_env(env);
let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
let pageservers = locate_result
.shards
.into_iter()
.map(|shard| {
(
Host::parse(&shard.listen_pg_addr)
.expect("Attachment service reported bad hostname"),
.expect("Storage controller reported bad hostname"),
shard.listen_pg_port,
)
})
@@ -1015,8 +1019,8 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
pageserver.pg_connection_config.port(),
)]
} else {
let attachment_service = AttachmentService::from_env(env);
attachment_service
let storage_controller = StorageController::from_env(env);
storage_controller
.tenant_locate(endpoint.tenant_id)
.await?
.shards
@@ -1024,7 +1028,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
.map(|shard| {
(
Host::parse(&shard.listen_pg_addr)
.expect("Attachment service reported malformed host"),
.expect("Storage controller reported malformed host"),
shard.listen_pg_port,
)
})
@@ -1100,9 +1104,8 @@ fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageSe
async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
match sub_match.subcommand() {
Some(("start", subcommand_args)) => {
let register = subcommand_args.get_one::<bool>("register").unwrap_or(&true);
if let Err(e) = get_pageserver(env, subcommand_args)?
.start(&pageserver_config_overrides(subcommand_args), *register)
.start(&pageserver_config_overrides(subcommand_args))
.await
{
eprintln!("pageserver start failed: {e}");
@@ -1131,7 +1134,7 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
}
if let Err(e) = pageserver
.start(&pageserver_config_overrides(subcommand_args), false)
.start(&pageserver_config_overrides(subcommand_args))
.await
{
eprintln!("pageserver start failed: {e}");
@@ -1144,8 +1147,8 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
let scheduling = subcommand_args.get_one("scheduling");
let availability = subcommand_args.get_one("availability");
let attachment_service = AttachmentService::from_env(env);
attachment_service
let storage_controller = StorageController::from_env(env);
storage_controller
.node_configure(NodeConfigureRequest {
node_id: pageserver.conf.id,
scheduling: scheduling.cloned(),
@@ -1170,11 +1173,11 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
Ok(())
}
async fn handle_attachment_service(
async fn handle_storage_controller(
sub_match: &ArgMatches,
env: &local_env::LocalEnv,
) -> Result<()> {
let svc = AttachmentService::from_env(env);
let svc = StorageController::from_env(env);
match sub_match.subcommand() {
Some(("start", _start_match)) => {
if let Err(e) = svc.start().await {
@@ -1194,8 +1197,8 @@ async fn handle_attachment_service(
exit(1);
}
}
Some((sub_name, _)) => bail!("Unexpected attachment_service subcommand '{}'", sub_name),
None => bail!("no attachment_service subcommand provided"),
Some((sub_name, _)) => bail!("Unexpected storage_controller subcommand '{}'", sub_name),
None => bail!("no storage_controller subcommand provided"),
}
Ok(())
}
@@ -1280,11 +1283,11 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
broker::start_broker_process(env).await?;
// Only start the attachment service if the pageserver is configured to need it
// Only start the storage controller if the pageserver is configured to need it
if env.control_plane_api.is_some() {
let attachment_service = AttachmentService::from_env(env);
if let Err(e) = attachment_service.start().await {
eprintln!("attachment_service start failed: {:#}", e);
let storage_controller = StorageController::from_env(env);
if let Err(e) = storage_controller.start().await {
eprintln!("storage_controller start failed: {:#}", e);
try_stop_all(env, true).await;
exit(1);
}
@@ -1293,7 +1296,7 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
for ps_conf in &env.pageservers {
let pageserver = PageServerNode::from_env(env, ps_conf);
if let Err(e) = pageserver
.start(&pageserver_config_overrides(sub_match), true)
.start(&pageserver_config_overrides(sub_match))
.await
{
eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e);
@@ -1356,9 +1359,9 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
}
if env.control_plane_api.is_some() {
let attachment_service = AttachmentService::from_env(env);
if let Err(e) = attachment_service.stop(immediate).await {
eprintln!("attachment service stop failed: {e:#}");
let storage_controller = StorageController::from_env(env);
if let Err(e) = storage_controller.stop(immediate).await {
eprintln!("storage controller stop failed: {e:#}");
}
}
}
@@ -1586,6 +1589,7 @@ fn cli() -> Command {
.about("Increase the number of shards in the tenant")
.arg(tenant_id_arg.clone())
.arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
.arg(Arg::new("shard-stripe-size").value_parser(value_parser!(u32)).long("shard-stripe-size").action(ArgAction::Set).help("Sharding stripe size in pages"))
)
)
.subcommand(
@@ -1596,11 +1600,7 @@ fn cli() -> Command {
.subcommand(Command::new("status"))
.subcommand(Command::new("start")
.about("Start local pageserver")
.arg(pageserver_config_args.clone()).arg(Arg::new("register")
.long("register")
.default_value("true").required(false)
.value_parser(value_parser!(bool))
.value_name("register"))
.arg(pageserver_config_args.clone())
)
.subcommand(Command::new("stop")
.about("Stop local pageserver")
@@ -1618,9 +1618,9 @@ fn cli() -> Command {
)
)
.subcommand(
Command::new("attachment_service")
Command::new("storage_controller")
.arg_required_else_help(true)
.about("Manage attachment_service")
.about("Manage storage_controller")
.subcommand(Command::new("start").about("Start local pageserver").arg(pageserver_config_args.clone()))
.subcommand(Command::new("stop").about("Stop local pageserver")
.arg(stop_mode_arg.clone()))

View File

@@ -57,9 +57,9 @@ use serde::{Deserialize, Serialize};
use url::Host;
use utils::id::{NodeId, TenantId, TimelineId};
use crate::attachment_service::AttachmentService;
use crate::local_env::LocalEnv;
use crate::postgresql_conf::PostgresConf;
use crate::storage_controller::StorageController;
use compute_api::responses::{ComputeState, ComputeStatus};
use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec};
@@ -750,17 +750,17 @@ impl Endpoint {
let postgresql_conf = self.read_postgresql_conf()?;
spec.cluster.postgresql_conf = Some(postgresql_conf);
// If we weren't given explicit pageservers, query the attachment service
// If we weren't given explicit pageservers, query the storage controller
if pageservers.is_empty() {
let attachment_service = AttachmentService::from_env(&self.env);
let locate_result = attachment_service.tenant_locate(self.tenant_id).await?;
let storage_controller = StorageController::from_env(&self.env);
let locate_result = storage_controller.tenant_locate(self.tenant_id).await?;
pageservers = locate_result
.shards
.into_iter()
.map(|shard| {
(
Host::parse(&shard.listen_pg_addr)
.expect("Attachment service reported bad hostname"),
.expect("Storage controller reported bad hostname"),
shard.listen_pg_port,
)
})
@@ -774,7 +774,10 @@ impl Endpoint {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
}
let client = reqwest::Client::new();
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.unwrap();
let response = client
.post(format!(
"http://{}:{}/configure",

View File

@@ -6,7 +6,6 @@
//! local installations.
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod attachment_service;
mod background_process;
pub mod broker;
pub mod endpoint;
@@ -14,3 +13,4 @@ pub mod local_env;
pub mod pageserver;
pub mod postgresql_conf;
pub mod safekeeper;
pub mod storage_controller;

View File

@@ -72,13 +72,13 @@ pub struct LocalEnv {
#[serde(default)]
pub safekeepers: Vec<SafekeeperConf>,
// Control plane upcall API for pageserver: if None, we will not run attachment_service. If set, this will
// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
// be propagated into each pageserver's configuration.
#[serde(default)]
pub control_plane_api: Option<Url>,
// Control plane upcall API for attachment service. If set, this will be propagated into the
// attachment service's configuration.
// Control plane upcall API for storage controller. If set, this will be propagated into the
// storage controller's configuration.
#[serde(default)]
pub control_plane_compute_hook_api: Option<Url>,
@@ -227,10 +227,10 @@ impl LocalEnv {
self.neon_distrib_dir.join("pageserver")
}
pub fn attachment_service_bin(&self) -> PathBuf {
// Irrespective of configuration, attachment service binary is always
pub fn storage_controller_bin(&self) -> PathBuf {
// Irrespective of configuration, storage controller binary is always
// run from the same location as neon_local. This means that for compatibility
// tests that run old pageserver/safekeeper, they still run latest attachment service.
// tests that run old pageserver/safekeeper, they still run latest storage controller.
let neon_local_bin_dir = env::current_exe().unwrap().parent().unwrap().to_owned();
neon_local_bin_dir.join("storage_controller")
}

View File

@@ -17,7 +17,6 @@ use std::time::Duration;
use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use futures::SinkExt;
use pageserver_api::controller_api::NodeRegisterRequest;
use pageserver_api::models::{
self, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, TimelineInfo,
};
@@ -31,7 +30,6 @@ use utils::{
lsn::Lsn,
};
use crate::attachment_service::AttachmentService;
use crate::local_env::PageServerConf;
use crate::{background_process, local_env::LocalEnv};
@@ -111,7 +109,7 @@ impl PageServerNode {
control_plane_api.as_str()
));
// Attachment service uses the same auth as pageserver: if JWT is enabled
// Storage controller uses the same auth as pageserver: if JWT is enabled
// for us, we will also need it to talk to them.
if matches!(self.conf.http_auth_type, AuthType::NeonJWT) {
let jwt_token = self
@@ -163,8 +161,8 @@ impl PageServerNode {
.expect("non-Unicode path")
}
pub async fn start(&self, config_overrides: &[&str], register: bool) -> anyhow::Result<()> {
self.start_node(config_overrides, false, register).await
pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
self.start_node(config_overrides, false).await
}
fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
@@ -202,6 +200,28 @@ impl PageServerNode {
String::from_utf8_lossy(&init_output.stderr),
);
// Write metadata file, used by pageserver on startup to register itself with
// the storage controller
let metadata_path = datadir.join("metadata.json");
let (_http_host, http_port) =
parse_host_port(&self.conf.listen_http_addr).expect("Unable to parse listen_http_addr");
let http_port = http_port.unwrap_or(9898);
// Intentionally hand-craft JSON: this acts as an implicit format compat test
// in case the pageserver-side structure is edited, and reflects the real life
// situation: the metadata is written by some other script.
std::fs::write(
metadata_path,
serde_json::to_vec(&serde_json::json!({
"host": "localhost",
"port": self.pg_connection_config.port(),
"http_host": "localhost",
"http_port": http_port,
}))
.unwrap(),
)
.expect("Failed to write metadata file");
Ok(())
}
@@ -209,27 +229,7 @@ impl PageServerNode {
&self,
config_overrides: &[&str],
update_config: bool,
register: bool,
) -> anyhow::Result<()> {
// Register the node with the storage controller before starting pageserver: pageserver must be registered to
// successfully call /re-attach and finish starting up.
if register {
let attachment_service = AttachmentService::from_env(&self.env);
let (pg_host, pg_port) =
parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr)
.expect("Unable to parse listen_http_addr");
attachment_service
.node_register(NodeRegisterRequest {
node_id: self.conf.id,
listen_pg_addr: pg_host.to_string(),
listen_pg_port: pg_port.unwrap_or(5432),
listen_http_addr: http_host.to_string(),
listen_http_port: http_port.unwrap_or(80),
})
.await?;
}
// TODO: using a thread here because start_process() is not async but we need to call check_status()
let datadir = self.repo_path();
print!(

View File

@@ -10,7 +10,7 @@ use pageserver_api::{
TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse,
TimelineCreateRequest, TimelineInfo,
},
shard::TenantShardId,
shard::{ShardStripeSize, TenantShardId},
};
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
@@ -24,7 +24,7 @@ use utils::{
id::{NodeId, TenantId},
};
pub struct AttachmentService {
pub struct StorageController {
env: LocalEnv,
listen: String,
path: Utf8PathBuf,
@@ -36,7 +36,7 @@ pub struct AttachmentService {
const COMMAND: &str = "storage_controller";
const ATTACHMENT_SERVICE_POSTGRES_VERSION: u32 = 16;
const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16;
#[derive(Serialize, Deserialize)]
pub struct AttachHookRequest {
@@ -59,7 +59,7 @@ pub struct InspectResponse {
pub attachment: Option<(u32, NodeId)>,
}
impl AttachmentService {
impl StorageController {
pub fn from_env(env: &LocalEnv) -> Self {
let path = Utf8PathBuf::from_path_buf(env.base_data_dir.clone())
.unwrap()
@@ -136,27 +136,27 @@ impl AttachmentService {
}
fn pid_file(&self) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("attachment_service.pid"))
Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_controller.pid"))
.expect("non-Unicode path")
}
/// PIDFile for the postgres instance used to store attachment service state
/// PIDFile for the postgres instance used to store storage controller state
fn postgres_pid_file(&self) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(
self.env
.base_data_dir
.join("attachment_service_postgres.pid"),
.join("storage_controller_postgres.pid"),
)
.expect("non-Unicode path")
}
/// Find the directory containing postgres binaries, such as `initdb` and `pg_ctl`
///
/// This usually uses ATTACHMENT_SERVICE_POSTGRES_VERSION of postgres, but will fall back
/// This usually uses STORAGE_CONTROLLER_POSTGRES_VERSION of postgres, but will fall back
/// to other versions if that one isn't found. Some automated tests create circumstances
/// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
let prefer_versions = [ATTACHMENT_SERVICE_POSTGRES_VERSION, 15, 14];
let prefer_versions = [STORAGE_CONTROLLER_POSTGRES_VERSION, 15, 14];
for v in prefer_versions {
let path = Utf8PathBuf::from_path_buf(self.env.pg_bin_dir(v)?).unwrap();
@@ -189,7 +189,7 @@ impl AttachmentService {
///
/// Returns the database url
pub async fn setup_database(&self) -> anyhow::Result<String> {
const DB_NAME: &str = "attachment_service";
const DB_NAME: &str = "storage_controller";
let database_url = format!("postgresql://localhost:{}/{DB_NAME}", self.postgres_port);
let pg_bin_dir = self.get_pg_bin_dir().await?;
@@ -219,10 +219,10 @@ impl AttachmentService {
}
pub async fn start(&self) -> anyhow::Result<()> {
// Start a vanilla Postgres process used by the attachment service for persistence.
// Start a vanilla Postgres process used by the storage controller for persistence.
let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
.unwrap()
.join("attachment_service_db");
.join("storage_controller_db");
let pg_bin_dir = self.get_pg_bin_dir().await?;
let pg_log_path = pg_data_path.join("postgres.log");
@@ -245,7 +245,7 @@ impl AttachmentService {
.await?;
};
println!("Starting attachment service database...");
println!("Starting storage controller database...");
let db_start_args = [
"-w",
"-D",
@@ -256,7 +256,7 @@ impl AttachmentService {
];
background_process::start_process(
"attachment_service_db",
"storage_controller_db",
&self.env.base_data_dir,
pg_bin_dir.join("pg_ctl").as_std_path(),
db_start_args,
@@ -300,7 +300,7 @@ impl AttachmentService {
background_process::start_process(
COMMAND,
&self.env.base_data_dir,
&self.env.attachment_service_bin(),
&self.env.storage_controller_bin(),
args,
[(
"NEON_REPO_DIR".to_string(),
@@ -322,10 +322,10 @@ impl AttachmentService {
pub async fn stop(&self, immediate: bool) -> anyhow::Result<()> {
background_process::stop_process(immediate, COMMAND, &self.pid_file())?;
let pg_data_path = self.env.base_data_dir.join("attachment_service_db");
let pg_data_path = self.env.base_data_dir.join("storage_controller_db");
let pg_bin_dir = self.get_pg_bin_dir().await?;
println!("Stopping attachment service database...");
println!("Stopping storage controller database...");
let pg_stop_args = ["-D", &pg_data_path.to_string_lossy(), "stop"];
let stop_status = Command::new(pg_bin_dir.join("pg_ctl"))
.args(pg_stop_args)
@@ -344,10 +344,10 @@ impl AttachmentService {
// fine that stop failed. Otherwise it is an error that stop failed.
const PG_STATUS_NOT_RUNNING: i32 = 3;
if Some(PG_STATUS_NOT_RUNNING) == status_exitcode.code() {
println!("Attachment service data base is already stopped");
println!("Storage controller database is already stopped");
return Ok(());
} else {
anyhow::bail!("Failed to stop attachment service database: {stop_status}")
anyhow::bail!("Failed to stop storage controller database: {stop_status}")
}
}
@@ -368,7 +368,7 @@ impl AttachmentService {
}
}
/// Simple HTTP request wrapper for calling into attachment service
/// Simple HTTP request wrapper for calling into storage controller
async fn dispatch<RQ, RS>(
&self,
method: hyper::Method,
@@ -496,11 +496,15 @@ impl AttachmentService {
&self,
tenant_id: TenantId,
new_shard_count: u8,
new_stripe_size: Option<ShardStripeSize>,
) -> anyhow::Result<TenantShardSplitResponse> {
self.dispatch(
Method::PUT,
format!("control/v1/tenant/{tenant_id}/shard_split"),
Some(TenantShardSplitRequest { new_shard_count }),
Some(TenantShardSplitRequest {
new_shard_count,
new_stripe_size,
}),
)
.await
}

View File

@@ -70,9 +70,9 @@ Should only be used e.g. for status check/tenant creation/list.
Should only be used e.g. for status check.
Currently also used for connection from any pageserver to any safekeeper.
"generations_api": Provides access to the upcall APIs served by the attachment service or the control plane.
"generations_api": Provides access to the upcall APIs served by the storage controller or the control plane.
"admin": Provides access to the control plane and admin APIs of the attachment service.
"admin": Provides access to the control plane and admin APIs of the storage controller.
### CLI
CLI generates a key pair during call to `neon_local init` with the following commands:

View File

@@ -88,8 +88,6 @@ impl FromStr for NodeAvailability {
}
}
/// FIXME: this is a duplicate of the type in the attachment_service crate, because the
/// type needs to be defined with diesel traits in there.
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)]
pub enum NodeSchedulingPolicy {
Active,

View File

@@ -198,6 +198,13 @@ pub struct TimelineCreateRequest {
#[derive(Serialize, Deserialize)]
pub struct TenantShardSplitRequest {
pub new_shard_count: u8,
// A tenant's stripe size is only meaningful the first time their shard count goes
// above 1: therefore during a split from 1->N shards, we may modify the stripe size.
//
// If this is set while the stripe count is being increased from an already >1 value,
// then the request will fail with 400.
pub new_stripe_size: Option<ShardStripeSize>,
}
#[derive(Serialize, Deserialize)]

View File

@@ -6,11 +6,18 @@
use serde::{Deserialize, Serialize};
use utils::id::NodeId;
use crate::shard::TenantShardId;
use crate::{controller_api::NodeRegisterRequest, shard::TenantShardId};
/// Upcall message sent by the pageserver to the configured `control_plane_api` on
/// startup.
#[derive(Serialize, Deserialize)]
pub struct ReAttachRequest {
pub node_id: NodeId,
/// Optional inline self-registration: this is useful with the storage controller,
/// if the node already has a node_id set.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub register: Option<NodeRegisterRequest>,
}
#[derive(Serialize, Deserialize)]

View File

@@ -84,9 +84,6 @@ where
info!("Handling request");
}
// Take a copy of the path for error logging
let path = request.uri().path().to_string();
// No special handling for panics here. There's a `tracing_panic_hook` from another
// module to do that globally.
let res = handler(request).await;
@@ -113,7 +110,7 @@ where
}
Ok(response)
}
Err(err) => Ok(api_error_handler(err, Some(&path))),
Err(err) => Ok(api_error_handler(err)),
}
}
.instrument(request_span)

View File

@@ -108,7 +108,7 @@ impl HttpErrorBody {
pub async fn route_error_handler(err: routerify::RouteError) -> Response<Body> {
match err.downcast::<ApiError>() {
Ok(api_error) => api_error_handler(*api_error, None),
Ok(api_error) => api_error_handler(*api_error),
Err(other_error) => {
// We expect all the request handlers to return an ApiError, so this should
// not be reached. But just in case.
@@ -121,16 +121,12 @@ pub async fn route_error_handler(err: routerify::RouteError) -> Response<Body> {
}
}
pub fn api_error_handler(api_error: ApiError, path: Option<&str>) -> Response<Body> {
pub fn api_error_handler(api_error: ApiError) -> Response<Body> {
// Print a stack trace for Internal Server errors
match api_error {
ApiError::Forbidden(_) | ApiError::Unauthorized(_) => {
warn!(
"Error processing HTTP request: {api_error:#} {}{}",
path.as_ref().map(|_| "at").unwrap_or(""),
path.unwrap_or("")
)
warn!("Error processing HTTP request: {api_error:#}")
}
ApiError::ResourceUnavailable(_) => info!("Error processing HTTP request: {api_error:#}"),
ApiError::NotFound(_) => info!("Error processing HTTP request: {api_error:#}"),

View File

@@ -123,6 +123,12 @@ impl PageserverFeedback {
rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
}
}
b"shard_number" => {
let len = buf.get_i32();
// TODO: this will be implemented in the next update,
// for now, we just skip the value.
buf.advance(len as usize);
}
_ => {
let len = buf.get_i32();
warn!(

View File

@@ -7,8 +7,9 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use pageserver_api::shard::TenantShardId;
use remote_storage::{RemotePath, RemoteStorageConfig};
use serde;
use serde::de::IntoDeserializer;
use std::env;
use std::{collections::HashMap, env};
use storage_broker::Uri;
use utils::crashsafe::path_with_suffix_extension;
use utils::id::ConnectionId;
@@ -304,6 +305,26 @@ impl<T> BuilderValue<T> {
}
}
// Certain metadata (e.g. externally-addressable name, AZ) is delivered
// as a separate structure. This information is not neeed by the pageserver
// itself, it is only used for registering the pageserver with the control
// plane and/or storage controller.
//
#[derive(serde::Deserialize)]
pub(crate) struct NodeMetadata {
#[serde(rename = "host")]
pub(crate) postgres_host: String,
#[serde(rename = "port")]
pub(crate) postgres_port: u16,
pub(crate) http_host: String,
pub(crate) http_port: u16,
// Deployment tools may write fields to the metadata file beyond what we
// use in this type: this type intentionally only names fields that require.
#[serde(flatten)]
pub(crate) other: HashMap<String, serde_json::Value>,
}
// needed to simplify config construction
struct PageServerConfigBuilder {
listen_pg_addr: BuilderValue<String>,
@@ -761,6 +782,10 @@ impl PageServerConf {
self.workdir.join("deletion")
}
pub fn metadata_path(&self) -> Utf8PathBuf {
self.workdir.join("metadata.json")
}
pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf {
// Encode a version in the filename, so that if we ever switch away from JSON we can
// increment this.

View File

@@ -2,6 +2,7 @@ use std::collections::HashMap;
use futures::Future;
use pageserver_api::{
controller_api::NodeRegisterRequest,
shard::TenantShardId,
upcall_api::{
ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
@@ -12,7 +13,10 @@ use tokio_util::sync::CancellationToken;
use url::Url;
use utils::{backoff, generation::Generation, id::NodeId};
use crate::config::PageServerConf;
use crate::{
config::{NodeMetadata, PageServerConf},
virtual_file::on_fatal_io_error,
};
/// The Pageserver's client for using the control plane API: this is a small subset
/// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
@@ -32,6 +36,7 @@ pub enum RetryForeverError {
pub trait ControlPlaneGenerationsApi {
fn re_attach(
&self,
conf: &PageServerConf,
) -> impl Future<Output = Result<HashMap<TenantShardId, Generation>, RetryForeverError>> + Send;
fn validate(
&self,
@@ -110,13 +115,59 @@ impl ControlPlaneClient {
impl ControlPlaneGenerationsApi for ControlPlaneClient {
/// Block until we get a successful response, or error out if we are shut down
async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
async fn re_attach(
&self,
conf: &PageServerConf,
) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
let re_attach_path = self
.base_url
.join("re-attach")
.expect("Failed to build re-attach path");
// Include registration content in the re-attach request if a metadata file is readable
let metadata_path = conf.metadata_path();
let register = match tokio::fs::read_to_string(&metadata_path).await {
Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
Ok(m) => {
// Since we run one time at startup, be generous in our logging and
// dump all metadata.
tracing::info!(
"Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
m.postgres_host,
m.postgres_port,
m.http_host,
m.http_port,
m.other
);
Some(NodeRegisterRequest {
node_id: conf.id,
listen_pg_addr: m.postgres_host,
listen_pg_port: m.postgres_port,
listen_http_addr: m.http_host,
listen_http_port: m.http_port,
})
}
Err(e) => {
tracing::error!("Unreadable metadata in {metadata_path}: {e}");
None
}
},
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
// This is legal: we may have been deployed with some external script
// doing registration for us.
tracing::info!("Metadata file not found at {metadata_path}");
} else {
on_fatal_io_error(&e, &format!("Loading metadata at {metadata_path}"))
}
None
}
};
let request = ReAttachRequest {
node_id: self.node_id,
register,
};
fail::fail_point!("control-plane-client-re-attach");

View File

@@ -831,7 +831,10 @@ mod test {
}
impl ControlPlaneGenerationsApi for MockControlPlane {
async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
async fn re_attach(
&self,
_conf: &PageServerConf,
) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
unimplemented!()
}
async fn validate(

View File

@@ -1151,7 +1151,12 @@ async fn tenant_shard_split_handler(
let new_shards = state
.tenant_manager
.shard_split(tenant_shard_id, ShardCount::new(req.new_shard_count), &ctx)
.shard_split(
tenant_shard_id,
ShardCount::new(req.new_shard_count),
req.new_stripe_size,
&ctx,
)
.await
.map_err(ApiError::InternalServerError)?;
@@ -2247,7 +2252,7 @@ pub fn make_router(
.get("/v1/location_config", |r| {
api_handler(r, list_location_config_handler)
})
.get("/v1/location_config/:tenant_id", |r| {
.get("/v1/location_config/:tenant_shard_id", |r| {
api_handler(r, get_location_config_handler)
})
.put(

View File

@@ -4625,10 +4625,7 @@ mod tests {
drop(guard);
// Pick a big LSN such that we query over all the changes.
// Technically, u64::MAX - 1 is the largest LSN supported by the read path,
// but there seems to be a bug on the non-vectored search path which surfaces
// in that case.
let reads_lsn = Lsn(u64::MAX - 1000);
let reads_lsn = Lsn(u64::MAX - 1);
for read in reads {
info!("Doing vectored read on {:?}", read);
@@ -5145,4 +5142,23 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_read_at_max_lsn() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_read_at_max_lsn")?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
let lsn = Lsn(0x10);
bulk_insert_compact_gc(tline.clone(), &ctx, lsn, 50, 10000).await?;
let test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let read_lsn = Lsn(u64::MAX - 1);
assert!(tline.get(test_key, read_lsn, &ctx).await.is_ok());
Ok(())
}
}

View File

@@ -6,7 +6,9 @@ use futures::stream::StreamExt;
use itertools::Itertools;
use pageserver_api::key::Key;
use pageserver_api::models::ShardParameters;
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId};
use pageserver_api::shard::{
ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
};
use rand::{distributions::Alphanumeric, Rng};
use std::borrow::Cow;
use std::cmp::Ordering;
@@ -295,7 +297,7 @@ async fn init_load_generations(
} else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
info!("Calling control plane API to re-attach tenants");
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
match client.re_attach().await {
match client.re_attach(conf).await {
Ok(tenants) => tenants,
Err(RetryForeverError::ShuttingDown) => {
anyhow::bail!("Shut down while waiting for control plane re-attach response")
@@ -1439,11 +1441,12 @@ impl TenantManager {
&self,
tenant_shard_id: TenantShardId,
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> {
let tenant = get_tenant(tenant_shard_id, true)?;
// Plan: identify what the new child shards will be
// Validate the incoming request
if new_shard_count.count() <= tenant_shard_id.shard_count.count() {
anyhow::bail!("Requested shard count is not an increase");
}
@@ -1452,10 +1455,18 @@ impl TenantManager {
anyhow::bail!("Requested split is not a power of two");
}
let parent_shard_identity = tenant.shard_identity;
let parent_tenant_conf = tenant.get_tenant_conf();
let parent_generation = tenant.generation;
if let Some(new_stripe_size) = new_stripe_size {
if tenant.get_shard_stripe_size() != new_stripe_size
&& tenant_shard_id.shard_count.count() > 1
{
// This tenant already has multiple shards, it is illegal to try and change its stripe size
anyhow::bail!(
"Shard stripe size may not be modified once tenant has multiple shards"
);
}
}
// Plan: identify what the new child shards will be
let child_shards = tenant_shard_id.split(new_shard_count);
tracing::info!(
"Shard {} splits into: {}",
@@ -1466,6 +1477,10 @@ impl TenantManager {
.join(",")
);
let parent_shard_identity = tenant.shard_identity;
let parent_tenant_conf = tenant.get_tenant_conf();
let parent_generation = tenant.generation;
// Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
if let Err(e) = tenant.split_prepare(&child_shards).await {
// If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
@@ -1515,6 +1530,9 @@ impl TenantManager {
// Phase 3: Spawn the child shards
for child_shard in &child_shards {
let mut child_shard_identity = parent_shard_identity;
if let Some(new_stripe_size) = new_stripe_size {
child_shard_identity.stripe_size = new_stripe_size;
}
child_shard_identity.count = child_shard.shard_count;
child_shard_identity.number = child_shard.shard_number;

View File

@@ -2478,7 +2478,7 @@ impl Timeline {
// 'prev_lsn' tracks the last LSN that we were at in our search. It's used
// to check that each iteration make some progress, to break infinite
// looping if something goes wrong.
let mut prev_lsn = Lsn(u64::MAX);
let mut prev_lsn = None;
let mut result = ValueReconstructResult::Continue;
let mut cont_lsn = Lsn(request_lsn.0 + 1);
@@ -2498,18 +2498,20 @@ impl Timeline {
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
return Ok(traversal_path);
}
if prev_lsn <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid
// getting stuck in the loop.
return Err(layer_traversal_error(format!(
"could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
key,
Lsn(cont_lsn.0 - 1),
request_lsn,
timeline.ancestor_lsn
), traversal_path));
if let Some(prev) = prev_lsn {
if prev <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid
// getting stuck in the loop.
return Err(layer_traversal_error(format!(
"could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
key,
Lsn(cont_lsn.0 - 1),
request_lsn,
timeline.ancestor_lsn
), traversal_path));
}
}
prev_lsn = cont_lsn;
prev_lsn = Some(cont_lsn);
}
ValueReconstructResult::Missing => {
return Err(layer_traversal_error(
@@ -2539,7 +2541,7 @@ impl Timeline {
timeline_owned = timeline.get_ready_ancestor_timeline(ctx).await?;
timeline = &*timeline_owned;
prev_lsn = Lsn(u64::MAX);
prev_lsn = None;
continue 'outer;
}

View File

@@ -17,7 +17,7 @@ use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
use uuid::Uuid;
use crate::{metrics::NUM_CLIENT_CONNECTION_GAUGE, serverless::tls_listener::AsyncAccept};
use crate::metrics::NUM_CLIENT_CONNECTION_GAUGE;
pub struct ProxyProtocolAccept {
pub incoming: AddrIncoming,
@@ -331,15 +331,15 @@ impl<T: AsyncRead> AsyncRead for WithClientIp<T> {
}
}
impl AsyncAccept for ProxyProtocolAccept {
type Connection = WithConnectionGuard<WithClientIp<AddrStream>>;
impl Accept for ProxyProtocolAccept {
type Conn = WithConnectionGuard<WithClientIp<AddrStream>>;
type Error = io::Error;
fn poll_accept(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Connection, Self::Error>>> {
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let conn = ready!(Pin::new(&mut self.incoming).poll_accept(cx)?);
tracing::info!(protocol = self.protocol, "accepted new TCP connection");
let Some(conn) = conn else {

View File

@@ -21,24 +21,19 @@ pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio_util::task::TaskTracker;
use crate::context::RequestMonitoring;
use crate::metrics::TLS_HANDSHAKE_FAILURES;
use crate::protocol2::{ProxyProtocolAccept, WithClientIp, WithConnectionGuard};
use crate::rate_limiter::EndpointRateLimiter;
use crate::serverless::backend::PoolingBackend;
use crate::{cancellation::CancellationHandler, config::ProxyConfig};
use futures::StreamExt;
use hyper::{
server::{
accept,
conn::{AddrIncoming, AddrStream},
},
server::conn::{AddrIncoming, AddrStream},
Body, Method, Request, Response,
};
use std::convert::Infallible;
use std::net::IpAddr;
use std::sync::Arc;
use std::task::Poll;
use std::{future::ready, sync::Arc};
use tls_listener::TlsListener;
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
@@ -105,19 +100,12 @@ pub async fn task_main(
let ws_connections = tokio_util::task::task_tracker::TaskTracker::new();
ws_connections.close(); // allows `ws_connections.wait to complete`
let tls_listener = TlsListener::new(tls_acceptor, addr_incoming).filter(|conn| {
if let Err(err) = conn {
error!(
protocol = "http",
"failed to accept TLS connection: {err:?}"
);
TLS_HANDSHAKE_FAILURES.inc();
ready(false)
} else {
info!(protocol = "http", "accepted new TLS connection");
ready(true)
}
});
let tls_listener = TlsListener::new(
tls_acceptor,
addr_incoming,
"http",
config.handshake_timeout,
);
let make_svc = hyper::service::make_service_fn(
|stream: &tokio_rustls::server::TlsStream<
@@ -174,7 +162,7 @@ pub async fn task_main(
},
);
hyper::Server::builder(accept::from_stream(tls_listener))
hyper::Server::builder(tls_listener)
.serve(make_svc)
.with_graceful_shutdown(cancellation_token.cancelled())
.await?;

View File

@@ -12,6 +12,7 @@ use crate::{
CachedNodeInfo,
},
context::RequestMonitoring,
error::{ErrorKind, ReportableError, UserFacingError},
proxy::connect_compute::ConnectMechanism,
};
@@ -117,6 +118,30 @@ pub enum HttpConnError {
WakeCompute(#[from] WakeComputeError),
}
impl ReportableError for HttpConnError {
fn get_error_kind(&self) -> ErrorKind {
match self {
HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
HttpConnError::ConnectionError(p) => p.get_error_kind(),
HttpConnError::GetAuthInfo(a) => a.get_error_kind(),
HttpConnError::AuthError(a) => a.get_error_kind(),
HttpConnError::WakeCompute(w) => w.get_error_kind(),
}
}
}
impl UserFacingError for HttpConnError {
fn to_string_client(&self) -> String {
match self {
HttpConnError::ConnectionClosedAbruptly(_) => self.to_string(),
HttpConnError::ConnectionError(p) => p.to_string(),
HttpConnError::GetAuthInfo(c) => c.to_string_client(),
HttpConnError::AuthError(c) => c.to_string_client(),
HttpConnError::WakeCompute(c) => c.to_string_client(),
}
}
}
struct TokioMechanism {
pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
conn_info: ConnInfo,

View File

@@ -119,16 +119,12 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
}
}
fn put(
pool: &RwLock<Self>,
conn_info: &ConnInfo,
client: ClientInner<C>,
) -> anyhow::Result<()> {
fn put(pool: &RwLock<Self>, conn_info: &ConnInfo, client: ClientInner<C>) {
let conn_id = client.conn_id;
if client.is_closed() {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because connection is closed");
return Ok(());
return;
}
let global_max_conn = pool.read().global_pool_size_max_conns;
if pool
@@ -138,7 +134,7 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
>= global_max_conn
{
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full");
return Ok(());
return;
}
// return connection to the pool
@@ -172,8 +168,6 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
} else {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
}
Ok(())
}
}
@@ -612,13 +606,6 @@ impl<C: ClientInnerExt> Client<C> {
let inner = inner.as_mut().expect("client inner should not be removed");
(&mut inner.inner, Discard { pool, conn_info })
}
pub fn check_idle(&mut self, status: ReadyForQueryStatus) {
self.inner().1.check_idle(status)
}
pub fn discard(&mut self) {
self.inner().1.discard()
}
}
impl<C: ClientInnerExt> Discard<'_, C> {
@@ -660,7 +647,7 @@ impl<C: ClientInnerExt> Client<C> {
// return connection to the pool
return Some(move || {
let _span = current_span.enter();
let _ = EndpointConnPool::put(&conn_pool, &conn_info, client);
EndpointConnPool::put(&conn_pool, &conn_info, client);
});
}
None
@@ -739,7 +726,7 @@ mod tests {
{
let mut client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone());
assert_eq!(0, pool.get_global_connections_count());
client.discard();
client.inner().1.discard();
// Discard should not add the connection from the pool.
assert_eq!(0, pool.get_global_connections_count());
}

View File

@@ -1,7 +1,11 @@
use std::pin::pin;
use std::sync::Arc;
use anyhow::bail;
use futures::future::select;
use futures::future::try_join;
use futures::future::Either;
use futures::StreamExt;
use futures::TryFutureExt;
use hyper::body::HttpBody;
use hyper::header;
use hyper::http::HeaderName;
@@ -11,13 +15,16 @@ use hyper::StatusCode;
use hyper::{Body, HeaderMap, Request};
use serde_json::json;
use serde_json::Value;
use tokio::try_join;
use tokio::time;
use tokio_postgres::error::DbError;
use tokio_postgres::error::ErrorPosition;
use tokio_postgres::error::SqlState;
use tokio_postgres::GenericClient;
use tokio_postgres::IsolationLevel;
use tokio_postgres::NoTls;
use tokio_postgres::ReadyForQueryStatus;
use tokio_postgres::Transaction;
use tokio_util::sync::CancellationToken;
use tracing::error;
use tracing::info;
use url::Url;
@@ -30,9 +37,13 @@ use crate::auth::ComputeUserInfoParseError;
use crate::config::ProxyConfig;
use crate::config::TlsConfig;
use crate::context::RequestMonitoring;
use crate::error::ErrorKind;
use crate::error::ReportableError;
use crate::error::UserFacingError;
use crate::metrics::HTTP_CONTENT_LENGTH;
use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
use crate::proxy::NeonOptions;
use crate::serverless::backend::HttpConnError;
use crate::DbName;
use crate::RoleName;
@@ -40,6 +51,7 @@ use super::backend::PoolingBackend;
use super::conn_pool::ConnInfo;
use super::json::json_to_pg_text;
use super::json::pg_text_row_to_json;
use super::json::JsonConversionError;
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -110,6 +122,18 @@ pub enum ConnInfoError {
MalformedEndpoint,
}
impl ReportableError for ConnInfoError {
fn get_error_kind(&self) -> ErrorKind {
ErrorKind::User
}
}
impl UserFacingError for ConnInfoError {
fn to_string_client(&self) -> String {
self.to_string()
}
}
fn get_conn_info(
ctx: &mut RequestMonitoring,
headers: &HeaderMap,
@@ -194,108 +218,123 @@ pub async fn handle(
request: Request<Body>,
backend: Arc<PoolingBackend>,
) -> Result<Response<Body>, ApiError> {
let result = tokio::time::timeout(
config.http_config.request_timeout,
handle_inner(config, &mut ctx, request, backend),
)
.await;
let cancel = CancellationToken::new();
let cancel2 = cancel.clone();
let handle = tokio::spawn(async move {
time::sleep(config.http_config.request_timeout).await;
cancel2.cancel();
});
let result = handle_inner(cancel, config, &mut ctx, request, backend).await;
handle.abort();
let mut response = match result {
Ok(r) => match r {
Ok(r) => {
ctx.set_success();
r
}
Err(e) => {
// TODO: ctx.set_error_kind(e.get_error_type());
let mut message = format!("{:?}", e);
let db_error = e
.downcast_ref::<tokio_postgres::Error>()
.and_then(|e| e.as_db_error());
fn get<'a, T: serde::Serialize>(
db: Option<&'a DbError>,
x: impl FnOnce(&'a DbError) -> T,
) -> Value {
db.map(x)
.and_then(|t| serde_json::to_value(t).ok())
.unwrap_or_default()
}
if let Some(db_error) = db_error {
db_error.message().clone_into(&mut message);
}
let position = db_error.and_then(|db| db.position());
let (position, internal_position, internal_query) = match position {
Some(ErrorPosition::Original(position)) => (
Value::String(position.to_string()),
Value::Null,
Value::Null,
),
Some(ErrorPosition::Internal { position, query }) => (
Value::Null,
Value::String(position.to_string()),
Value::String(query.clone()),
),
None => (Value::Null, Value::Null, Value::Null),
};
let code = get(db_error, |db| db.code().code());
let severity = get(db_error, |db| db.severity());
let detail = get(db_error, |db| db.detail());
let hint = get(db_error, |db| db.hint());
let where_ = get(db_error, |db| db.where_());
let table = get(db_error, |db| db.table());
let column = get(db_error, |db| db.column());
let schema = get(db_error, |db| db.schema());
let datatype = get(db_error, |db| db.datatype());
let constraint = get(db_error, |db| db.constraint());
let file = get(db_error, |db| db.file());
let line = get(db_error, |db| db.line().map(|l| l.to_string()));
let routine = get(db_error, |db| db.routine());
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
);
// TODO: this shouldn't always be bad request.
json_response(
StatusCode::BAD_REQUEST,
json!({
"message": message,
"code": code,
"detail": detail,
"hint": hint,
"position": position,
"internalPosition": internal_position,
"internalQuery": internal_query,
"severity": severity,
"where": where_,
"table": table,
"column": column,
"schema": schema,
"dataType": datatype,
"constraint": constraint,
"file": file,
"line": line,
"routine": routine,
}),
)?
}
},
Err(_) => {
// TODO: when http error classification is done, distinguish between
// timeout on sql vs timeout in proxy/cplane
// ctx.set_error_kind(crate::error::ErrorKind::RateLimit);
Ok(r) => {
ctx.set_success();
r
}
Err(e @ SqlOverHttpError::Cancelled(_)) => {
let error_kind = e.get_error_kind();
ctx.set_error_kind(error_kind);
let message = format!(
"HTTP-Connection timed out, execution time exceeded {} seconds",
config.http_config.request_timeout.as_secs()
"Query cancelled, runtime exceeded. SQL queries over HTTP must not exceed {} seconds of runtime. Please consider using our websocket based connections",
config.http_config.request_timeout.as_secs_f64()
);
error!(message);
tracing::info!(
kind=error_kind.to_metric_label(),
error=%e,
msg=message,
"forwarding error to user"
);
json_response(
StatusCode::GATEWAY_TIMEOUT,
json!({ "message": message, "code": StatusCode::GATEWAY_TIMEOUT.as_u16() }),
StatusCode::BAD_REQUEST,
json!({ "message": message, "code": SqlState::PROTOCOL_VIOLATION.code() }),
)?
}
Err(e) => {
let error_kind = e.get_error_kind();
ctx.set_error_kind(error_kind);
let mut message = e.to_string_client();
let db_error = match &e {
SqlOverHttpError::ConnectCompute(HttpConnError::ConnectionError(e))
| SqlOverHttpError::Postgres(e) => e.as_db_error(),
_ => None,
};
fn get<'a, T: serde::Serialize>(
db: Option<&'a DbError>,
x: impl FnOnce(&'a DbError) -> T,
) -> Value {
db.map(x)
.and_then(|t| serde_json::to_value(t).ok())
.unwrap_or_default()
}
if let Some(db_error) = db_error {
db_error.message().clone_into(&mut message);
}
let position = db_error.and_then(|db| db.position());
let (position, internal_position, internal_query) = match position {
Some(ErrorPosition::Original(position)) => (
Value::String(position.to_string()),
Value::Null,
Value::Null,
),
Some(ErrorPosition::Internal { position, query }) => (
Value::Null,
Value::String(position.to_string()),
Value::String(query.clone()),
),
None => (Value::Null, Value::Null, Value::Null),
};
let code = get(db_error, |db| db.code().code());
let severity = get(db_error, |db| db.severity());
let detail = get(db_error, |db| db.detail());
let hint = get(db_error, |db| db.hint());
let where_ = get(db_error, |db| db.where_());
let table = get(db_error, |db| db.table());
let column = get(db_error, |db| db.column());
let schema = get(db_error, |db| db.schema());
let datatype = get(db_error, |db| db.datatype());
let constraint = get(db_error, |db| db.constraint());
let file = get(db_error, |db| db.file());
let line = get(db_error, |db| db.line().map(|l| l.to_string()));
let routine = get(db_error, |db| db.routine());
tracing::info!(
kind=error_kind.to_metric_label(),
error=%e,
msg=message,
"forwarding error to user"
);
// TODO: this shouldn't always be bad request.
json_response(
StatusCode::BAD_REQUEST,
json!({
"message": message,
"code": code,
"detail": detail,
"hint": hint,
"position": position,
"internalPosition": internal_position,
"internalQuery": internal_query,
"severity": severity,
"where": where_,
"table": table,
"column": column,
"schema": schema,
"dataType": datatype,
"constraint": constraint,
"file": file,
"line": line,
"routine": routine,
}),
)?
}
};
@@ -307,12 +346,101 @@ pub async fn handle(
Ok(response)
}
#[derive(Debug, thiserror::Error)]
pub enum SqlOverHttpError {
#[error("{0}")]
ReadPayload(#[from] ReadPayloadError),
#[error("{0}")]
ConnectCompute(#[from] HttpConnError),
#[error("{0}")]
ConnInfo(#[from] ConnInfoError),
#[error("request is too large (max is {MAX_REQUEST_SIZE} bytes)")]
RequestTooLarge,
#[error("response is too large (max is {MAX_RESPONSE_SIZE} bytes)")]
ResponseTooLarge,
#[error("invalid isolation level")]
InvalidIsolationLevel,
#[error("{0}")]
Postgres(#[from] tokio_postgres::Error),
#[error("{0}")]
JsonConversion(#[from] JsonConversionError),
#[error("{0}")]
Cancelled(SqlOverHttpCancel),
}
impl ReportableError for SqlOverHttpError {
fn get_error_kind(&self) -> ErrorKind {
match self {
SqlOverHttpError::ReadPayload(e) => e.get_error_kind(),
SqlOverHttpError::ConnectCompute(e) => e.get_error_kind(),
SqlOverHttpError::ConnInfo(e) => e.get_error_kind(),
SqlOverHttpError::RequestTooLarge => ErrorKind::User,
SqlOverHttpError::ResponseTooLarge => ErrorKind::User,
SqlOverHttpError::InvalidIsolationLevel => ErrorKind::User,
SqlOverHttpError::Postgres(p) => p.get_error_kind(),
SqlOverHttpError::JsonConversion(_) => ErrorKind::Postgres,
SqlOverHttpError::Cancelled(c) => c.get_error_kind(),
}
}
}
impl UserFacingError for SqlOverHttpError {
fn to_string_client(&self) -> String {
match self {
SqlOverHttpError::ReadPayload(p) => p.to_string(),
SqlOverHttpError::ConnectCompute(c) => c.to_string_client(),
SqlOverHttpError::ConnInfo(c) => c.to_string_client(),
SqlOverHttpError::RequestTooLarge => self.to_string(),
SqlOverHttpError::ResponseTooLarge => self.to_string(),
SqlOverHttpError::InvalidIsolationLevel => self.to_string(),
SqlOverHttpError::Postgres(p) => p.to_string(),
SqlOverHttpError::JsonConversion(_) => "could not parse postgres response".to_string(),
SqlOverHttpError::Cancelled(_) => self.to_string(),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum ReadPayloadError {
#[error("could not read the HTTP request body: {0}")]
Read(#[from] hyper::Error),
#[error("could not parse the HTTP request body: {0}")]
Parse(#[from] serde_json::Error),
}
impl ReportableError for ReadPayloadError {
fn get_error_kind(&self) -> ErrorKind {
match self {
ReadPayloadError::Read(_) => ErrorKind::ClientDisconnect,
ReadPayloadError::Parse(_) => ErrorKind::User,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum SqlOverHttpCancel {
#[error("query was cancelled")]
Postgres,
#[error("query was cancelled while stuck trying to connect to the database")]
Connect,
}
impl ReportableError for SqlOverHttpCancel {
fn get_error_kind(&self) -> ErrorKind {
match self {
SqlOverHttpCancel::Postgres => ErrorKind::RateLimit,
SqlOverHttpCancel::Connect => ErrorKind::ServiceRateLimit,
}
}
}
async fn handle_inner(
cancel: CancellationToken,
config: &'static ProxyConfig,
ctx: &mut RequestMonitoring,
request: Request<Body>,
backend: Arc<PoolingBackend>,
) -> anyhow::Result<Response<Body>> {
) -> Result<Response<Body>, SqlOverHttpError> {
let _request_gauge = NUM_CONNECTION_REQUESTS_GAUGE
.with_label_values(&[ctx.protocol])
.guard();
@@ -345,7 +473,7 @@ async fn handle_inner(
b"ReadUncommitted" => IsolationLevel::ReadUncommitted,
b"ReadCommitted" => IsolationLevel::ReadCommitted,
b"RepeatableRead" => IsolationLevel::RepeatableRead,
_ => bail!("invalid isolation level"),
_ => return Err(SqlOverHttpError::InvalidIsolationLevel),
}),
None => None,
};
@@ -363,19 +491,16 @@ async fn handle_inner(
// we don't have a streaming request support yet so this is to prevent OOM
// from a malicious user sending an extremely large request body
if request_content_length > MAX_REQUEST_SIZE {
return Err(anyhow::anyhow!(
"request is too large (max is {MAX_REQUEST_SIZE} bytes)"
));
return Err(SqlOverHttpError::RequestTooLarge);
}
let fetch_and_process_request = async {
let body = hyper::body::to_bytes(request.into_body())
.await
.map_err(anyhow::Error::from)?;
let body = hyper::body::to_bytes(request.into_body()).await?;
info!(length = body.len(), "request payload read");
let payload: Payload = serde_json::from_slice(&body)?;
Ok::<Payload, anyhow::Error>(payload) // Adjust error type accordingly
};
Ok::<Payload, ReadPayloadError>(payload) // Adjust error type accordingly
}
.map_err(SqlOverHttpError::from);
let authenticate_and_connect = async {
let keys = backend.authenticate(ctx, &conn_info).await?;
@@ -385,11 +510,25 @@ async fn handle_inner(
// not strictly necessary to mark success here,
// but it's just insurance for if we forget it somewhere else
ctx.latency_timer.success();
Ok::<_, anyhow::Error>(client)
};
Ok::<_, HttpConnError>(client)
}
.map_err(SqlOverHttpError::from);
// Run both operations in parallel
let (payload, mut client) = try_join!(fetch_and_process_request, authenticate_and_connect)?;
let (payload, mut client) = match select(
try_join(
pin!(fetch_and_process_request),
pin!(authenticate_and_connect),
),
pin!(cancel.cancelled()),
)
.await
{
Either::Left((result, _cancelled)) => result?,
Either::Right((_cancelled, _)) => {
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Connect))
}
};
let mut response = Response::builder()
.status(StatusCode::OK)
@@ -401,19 +540,64 @@ async fn handle_inner(
let mut size = 0;
let result = match payload {
Payload::Single(stmt) => {
let (status, results) =
query_to_json(&*client, stmt, &mut 0, raw_output, default_array_mode)
.await
.map_err(|e| {
client.discard();
e
})?;
client.check_idle(status);
results
let mut size = 0;
let (inner, mut discard) = client.inner();
let cancel_token = inner.cancel_token();
let query = pin!(query_to_json(
&*inner,
stmt,
&mut size,
raw_output,
default_array_mode
));
let cancelled = pin!(cancel.cancelled());
let res = select(query, cancelled).await;
match res {
Either::Left((Ok((status, results)), _cancelled)) => {
discard.check_idle(status);
results
}
Either::Left((Err(e), _cancelled)) => {
discard.discard();
return Err(e);
}
Either::Right((_cancelled, query)) => {
if let Err(err) = cancel_token.cancel_query(NoTls).await {
tracing::error!(?err, "could not cancel query");
}
match time::timeout(time::Duration::from_millis(100), query).await {
Ok(Ok((status, results))) => {
discard.check_idle(status);
results
}
Ok(Err(error)) => {
let db_error = match &error {
SqlOverHttpError::ConnectCompute(
HttpConnError::ConnectionError(e),
)
| SqlOverHttpError::Postgres(e) => e.as_db_error(),
_ => None,
};
// if errored for some other reason, it might not be safe to return
if !db_error.is_some_and(|e| *e.code() == SqlState::QUERY_CANCELED) {
discard.discard();
}
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres));
}
Err(_timeout) => {
discard.discard();
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres));
}
}
}
}
}
Payload::Batch(statements) => {
info!("starting transaction");
let (inner, mut discard) = client.inner();
let cancel_token = inner.cancel_token();
let mut builder = inner.build_transaction();
if let Some(isolation_level) = txn_isolation_level {
builder = builder.isolation_level(isolation_level);
@@ -433,6 +617,7 @@ async fn handle_inner(
})?;
let results = match query_batch(
cancel.child_token(),
&transaction,
statements,
&mut size,
@@ -452,6 +637,15 @@ async fn handle_inner(
discard.check_idle(status);
results
}
Err(SqlOverHttpError::Cancelled(_)) => {
if let Err(err) = cancel_token.cancel_query(NoTls).await {
tracing::error!(?err, "could not cancel query");
}
// TODO: after cancelling, wait to see if we can get a status. maybe the connection is still safe.
discard.discard();
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres));
}
Err(err) => {
info!("rollback");
let status = transaction.rollback().await.map_err(|e| {
@@ -466,16 +660,10 @@ async fn handle_inner(
};
if txn_read_only {
response = response.header(
TXN_READ_ONLY.clone(),
HeaderValue::try_from(txn_read_only.to_string())?,
);
response = response.header(TXN_READ_ONLY.clone(), &HEADER_VALUE_TRUE);
}
if txn_deferrable {
response = response.header(
TXN_DEFERRABLE.clone(),
HeaderValue::try_from(txn_deferrable.to_string())?,
);
response = response.header(TXN_DEFERRABLE.clone(), &HEADER_VALUE_TRUE);
}
if let Some(txn_isolation_level) = txn_isolation_level_raw {
response = response.header(TXN_ISOLATION_LEVEL.clone(), txn_isolation_level);
@@ -503,19 +691,37 @@ async fn handle_inner(
}
async fn query_batch(
cancel: CancellationToken,
transaction: &Transaction<'_>,
queries: BatchQueryData,
total_size: &mut usize,
raw_output: bool,
array_mode: bool,
) -> anyhow::Result<Vec<Value>> {
) -> Result<Vec<Value>, SqlOverHttpError> {
let mut results = Vec::with_capacity(queries.queries.len());
let mut current_size = 0;
for stmt in queries.queries {
// TODO: maybe we should check that the transaction bit is set here
let (_, values) =
query_to_json(transaction, stmt, &mut current_size, raw_output, array_mode).await?;
results.push(values);
let query = pin!(query_to_json(
transaction,
stmt,
&mut current_size,
raw_output,
array_mode
));
let cancelled = pin!(cancel.cancelled());
let res = select(query, cancelled).await;
match res {
// TODO: maybe we should check that the transaction bit is set here
Either::Left((Ok((_, values)), _cancelled)) => {
results.push(values);
}
Either::Left((Err(e), _cancelled)) => {
return Err(e);
}
Either::Right((_cancelled, _)) => {
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres));
}
}
}
*total_size += current_size;
Ok(results)
@@ -527,7 +733,7 @@ async fn query_to_json<T: GenericClient>(
current_size: &mut usize,
raw_output: bool,
default_array_mode: bool,
) -> anyhow::Result<(ReadyForQueryStatus, Value)> {
) -> Result<(ReadyForQueryStatus, Value), SqlOverHttpError> {
info!("executing query");
let query_params = data.params;
let mut row_stream = std::pin::pin!(client.query_raw_txt(&data.query, query_params).await?);
@@ -544,9 +750,7 @@ async fn query_to_json<T: GenericClient>(
// we don't have a streaming response support yet so this is to prevent OOM
// from a malicious query (eg a cross join)
if *current_size > MAX_RESPONSE_SIZE {
return Err(anyhow::anyhow!(
"response is too large (max is {MAX_RESPONSE_SIZE} bytes)"
));
return Err(SqlOverHttpError::ResponseTooLarge);
}
}

View File

@@ -1,186 +1,110 @@
use std::{
convert::Infallible,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use futures::{Future, Stream, StreamExt};
use hyper::server::{accept::Accept, conn::AddrStream};
use pin_project_lite::pin_project;
use thiserror::Error;
use tokio::{
io::{AsyncRead, AsyncWrite},
task::JoinSet,
time::timeout,
};
use tokio_rustls::{server::TlsStream, TlsAcceptor};
use tracing::{info, warn};
/// Default timeout for the TLS handshake.
pub const DEFAULT_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);
use crate::{
metrics::TLS_HANDSHAKE_FAILURES,
protocol2::{WithClientIp, WithConnectionGuard},
};
/// Trait for TLS implementation.
///
/// Implementations are provided by the rustls and native-tls features.
pub trait AsyncTls<C: AsyncRead + AsyncWrite>: Clone {
/// The type of the TLS stream created from the underlying stream.
type Stream: Send + 'static;
/// Error type for completing the TLS handshake
type Error: std::error::Error + Send + 'static;
/// Type of the Future for the TLS stream that is accepted.
type AcceptFuture: Future<Output = Result<Self::Stream, Self::Error>> + Send + 'static;
/// Accept a TLS connection on an underlying stream
fn accept(&self, stream: C) -> Self::AcceptFuture;
pin_project! {
/// Wraps a `Stream` of connections (such as a TCP listener) so that each connection is itself
/// encrypted using TLS.
pub(crate) struct TlsListener<A: Accept> {
#[pin]
listener: A,
tls: TlsAcceptor,
waiting: JoinSet<Option<TlsStream<A::Conn>>>,
timeout: Duration,
protocol: &'static str,
}
}
/// Asynchronously accept connections.
pub trait AsyncAccept {
/// The type of the connection that is accepted.
type Connection: AsyncRead + AsyncWrite;
/// The type of error that may be returned.
type Error;
/// Poll to accept the next connection.
fn poll_accept(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Connection, Self::Error>>>;
/// Return a new `AsyncAccept` that stops accepting connections after
/// `ender` completes.
///
/// Useful for graceful shutdown.
///
/// See [examples/echo.rs](https://github.com/tmccombs/tls-listener/blob/main/examples/echo.rs)
/// for example of how to use.
fn until<F: Future>(self, ender: F) -> Until<Self, F>
where
Self: Sized,
{
Until {
acceptor: self,
ender,
impl<A: Accept> TlsListener<A> {
/// Create a `TlsListener` with default options.
pub(crate) fn new(
tls: TlsAcceptor,
listener: A,
protocol: &'static str,
timeout: Duration,
) -> Self {
TlsListener {
listener,
tls,
waiting: JoinSet::new(),
timeout,
protocol,
}
}
}
pin_project! {
///
/// Wraps a `Stream` of connections (such as a TCP listener) so that each connection is itself
/// encrypted using TLS.
///
/// It is similar to:
///
/// ```ignore
/// tcpListener.and_then(|s| tlsAcceptor.accept(s))
/// ```
///
/// except that it has the ability to accept multiple transport-level connections
/// simultaneously while the TLS handshake is pending for other connections.
///
/// By default, if a client fails the TLS handshake, that is treated as an error, and the
/// `TlsListener` will return an `Err`. If the `TlsListener` is passed directly to a hyper
/// [`Server`][1], then an invalid handshake can cause the server to stop accepting connections.
/// See [`http-stream.rs`][2] or [`http-low-level`][3] examples, for examples of how to avoid this.
///
/// Note that if the maximum number of pending connections is greater than 1, the resulting
/// [`T::Stream`][4] connections may come in a different order than the connections produced by the
/// underlying listener.
///
/// [1]: https://docs.rs/hyper/latest/hyper/server/struct.Server.html
/// [2]: https://github.com/tmccombs/tls-listener/blob/main/examples/http-stream.rs
/// [3]: https://github.com/tmccombs/tls-listener/blob/main/examples/http-low-level.rs
/// [4]: AsyncTls::Stream
///
#[allow(clippy::type_complexity)]
pub struct TlsListener<A: AsyncAccept, T: AsyncTls<A::Connection>> {
#[pin]
listener: A,
tls: T,
waiting: JoinSet<Result<Result<T::Stream, T::Error>, tokio::time::error::Elapsed>>,
timeout: Duration,
}
}
/// Builder for `TlsListener`.
#[derive(Clone)]
pub struct Builder<T> {
tls: T,
handshake_timeout: Duration,
}
/// Wraps errors from either the listener or the TLS Acceptor
#[derive(Debug, Error)]
pub enum Error<LE: std::error::Error, TE: std::error::Error> {
/// An error that arose from the listener ([AsyncAccept::Error])
#[error("{0}")]
ListenerError(#[source] LE),
/// An error that occurred during the TLS accept handshake
#[error("{0}")]
TlsAcceptError(#[source] TE),
}
impl<A: AsyncAccept, T> TlsListener<A, T>
impl<A> Accept for TlsListener<A>
where
T: AsyncTls<A::Connection>,
{
/// Create a `TlsListener` with default options.
pub fn new(tls: T, listener: A) -> Self {
builder(tls).listen(listener)
}
}
impl<A, T> TlsListener<A, T>
where
A: AsyncAccept,
A: Accept<Conn = WithConnectionGuard<WithClientIp<AddrStream>>>,
A::Error: std::error::Error,
T: AsyncTls<A::Connection>,
A::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
/// Accept the next connection
///
/// This is essentially an alias to `self.next()` with a more domain-appropriate name.
pub async fn accept(&mut self) -> Option<<Self as Stream>::Item>
where
Self: Unpin,
{
self.next().await
}
type Conn = TlsStream<A::Conn>;
/// Replaces the Tls Acceptor configuration, which will be used for new connections.
///
/// This can be used to change the certificate used at runtime.
pub fn replace_acceptor(&mut self, acceptor: T) {
self.tls = acceptor;
}
type Error = Infallible;
/// Replaces the Tls Acceptor configuration from a pinned reference to `Self`.
///
/// This is useful if your listener is `!Unpin`.
///
/// This can be used to change the certificate used at runtime.
pub fn replace_acceptor_pin(self: Pin<&mut Self>, acceptor: T) {
*self.project().tls = acceptor;
}
}
impl<A, T> Stream for TlsListener<A, T>
where
A: AsyncAccept,
A::Error: std::error::Error,
T: AsyncTls<A::Connection>,
{
type Item = Result<T::Stream, Error<A::Error, T::Error>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_accept(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let mut this = self.project();
loop {
match this.listener.as_mut().poll_accept(cx) {
Poll::Pending => break,
Poll::Ready(Some(Ok(conn))) => {
this.waiting
.spawn(timeout(*this.timeout, this.tls.accept(conn)));
Poll::Ready(Some(Ok(mut conn))) => {
let t = *this.timeout;
let tls = this.tls.clone();
let protocol = *this.protocol;
this.waiting.spawn(async move {
let peer_addr = match conn.inner.wait_for_addr().await {
Ok(Some(addr)) => addr,
Err(e) => {
tracing::error!("failed to accept TCP connection: invalid PROXY protocol V2 header: {e:#}");
return None;
}
Ok(None) => conn.inner.inner.remote_addr()
};
let accept = tls.accept(conn);
match timeout(t, accept).await {
Ok(Ok(conn)) => Some(conn),
// The handshake failed, try getting another connection from the queue
Ok(Err(e)) => {
TLS_HANDSHAKE_FAILURES.inc();
warn!(%peer_addr, protocol, "failed to accept TLS connection: {e:?}");
None
}
// The handshake timed out, try getting another connection from the queue
Err(_) => {
TLS_HANDSHAKE_FAILURES.inc();
warn!(%peer_addr, protocol, "failed to accept TLS connection: timeout");
None
}
}
});
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(Error::ListenerError(e))));
tracing::error!("error accepting TCP connection: {e}");
continue;
}
Poll::Ready(None) => return Poll::Ready(None),
}
@@ -188,96 +112,19 @@ where
loop {
return match this.waiting.poll_join_next(cx) {
Poll::Ready(Some(Ok(Ok(conn)))) => {
Poll::Ready(Some(conn.map_err(Error::TlsAcceptError)))
Poll::Ready(Some(Ok(Some(conn)))) => {
info!(protocol = this.protocol, "accepted new TLS connection");
Poll::Ready(Some(Ok(conn)))
}
// The handshake timed out, try getting another connection from the queue
Poll::Ready(Some(Ok(Err(_)))) => continue,
// The handshake panicked
Poll::Ready(Some(Err(e))) if e.is_panic() => {
std::panic::resume_unwind(e.into_panic())
// The handshake failed to complete, try getting another connection from the queue
Poll::Ready(Some(Ok(None))) => continue,
// The handshake panicked or was cancelled. ignore and get another connection
Poll::Ready(Some(Err(e))) => {
tracing::warn!("handshake aborted: {e}");
continue;
}
// The handshake was externally aborted
Poll::Ready(Some(Err(_))) => unreachable!("handshake tasks are never aborted"),
_ => Poll::Pending,
};
}
}
}
impl<C: AsyncRead + AsyncWrite + Unpin + Send + 'static> AsyncTls<C> for tokio_rustls::TlsAcceptor {
type Stream = tokio_rustls::server::TlsStream<C>;
type Error = std::io::Error;
type AcceptFuture = tokio_rustls::Accept<C>;
fn accept(&self, conn: C) -> Self::AcceptFuture {
tokio_rustls::TlsAcceptor::accept(self, conn)
}
}
impl<T> Builder<T> {
/// Set the timeout for handshakes.
///
/// If a timeout takes longer than `timeout`, then the handshake will be
/// aborted and the underlying connection will be dropped.
///
/// Defaults to `DEFAULT_HANDSHAKE_TIMEOUT`.
pub fn handshake_timeout(&mut self, timeout: Duration) -> &mut Self {
self.handshake_timeout = timeout;
self
}
/// Create a `TlsListener` from the builder
///
/// Actually build the `TlsListener`. The `listener` argument should be
/// an implementation of the `AsyncAccept` trait that accepts new connections
/// that the `TlsListener` will encrypt using TLS.
pub fn listen<A: AsyncAccept>(&self, listener: A) -> TlsListener<A, T>
where
T: AsyncTls<A::Connection>,
{
TlsListener {
listener,
tls: self.tls.clone(),
waiting: JoinSet::new(),
timeout: self.handshake_timeout,
}
}
}
/// Create a new Builder for a TlsListener
///
/// `server_config` will be used to configure the TLS sessions.
pub fn builder<T>(tls: T) -> Builder<T> {
Builder {
tls,
handshake_timeout: DEFAULT_HANDSHAKE_TIMEOUT,
}
}
pin_project! {
/// See [`AsyncAccept::until`]
pub struct Until<A, E> {
#[pin]
acceptor: A,
#[pin]
ender: E,
}
}
impl<A: AsyncAccept, E: Future> AsyncAccept for Until<A, E> {
type Connection = A::Connection;
type Error = A::Error;
fn poll_accept(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Connection, Self::Error>>> {
let this = self.project();
match this.ender.poll(cx) {
Poll::Pending => this.acceptor.poll_accept(cx),
Poll::Ready(_) => Poll::Ready(None),
}
}
}

View File

@@ -140,6 +140,13 @@ pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
});
pub static RECEIVED_PS_FEEDBACKS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_received_ps_feedbacks_total",
"Number of pageserver feedbacks received"
)
.expect("Failed to register safekeeper_received_ps_feedbacks_total counter")
});
pub const LABEL_UNKNOWN: &str = "unknown";
@@ -301,7 +308,8 @@ pub async fn time_io_closure<E: Into<anyhow::Error>>(
#[derive(Clone)]
pub struct FullTimelineInfo {
pub ttid: TenantTimelineId,
pub ps_feedback: PageserverFeedback,
pub ps_feedback_count: u64,
pub last_ps_feedback: PageserverFeedback,
pub wal_backup_active: bool,
pub timeline_is_active: bool,
pub num_computes: u32,
@@ -327,6 +335,7 @@ pub struct TimelineCollector {
remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
ps_feedback_count: GenericGaugeVec<AtomicU64>,
timeline_active: GenericGaugeVec<AtomicU64>,
wal_backup_active: GenericGaugeVec<AtomicU64>,
connected_computes: IntGaugeVec,
@@ -430,6 +439,15 @@ impl TimelineCollector {
.unwrap();
descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
let ps_feedback_count = GenericGaugeVec::new(
Opts::new(
"safekeeper_ps_feedback_count_total",
"Number of feedbacks received from the pageserver",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
let timeline_active = GenericGaugeVec::new(
Opts::new(
"safekeeper_timeline_active",
@@ -538,6 +556,7 @@ impl TimelineCollector {
remote_consistent_lsn,
ps_last_received_lsn,
feedback_last_time_seconds,
ps_feedback_count,
timeline_active,
wal_backup_active,
connected_computes,
@@ -570,6 +589,7 @@ impl Collector for TimelineCollector {
self.remote_consistent_lsn.reset();
self.ps_last_received_lsn.reset();
self.feedback_last_time_seconds.reset();
self.ps_feedback_count.reset();
self.timeline_active.reset();
self.wal_backup_active.reset();
self.connected_computes.reset();
@@ -646,9 +666,12 @@ impl Collector for TimelineCollector {
self.ps_last_received_lsn
.with_label_values(labels)
.set(tli.ps_feedback.last_received_lsn.0);
.set(tli.last_ps_feedback.last_received_lsn.0);
self.ps_feedback_count
.with_label_values(labels)
.set(tli.ps_feedback_count);
if let Ok(unix_time) = tli
.ps_feedback
.last_ps_feedback
.replytime
.duration_since(SystemTime::UNIX_EPOCH)
{
@@ -679,6 +702,7 @@ impl Collector for TimelineCollector {
mfs.extend(self.remote_consistent_lsn.collect());
mfs.extend(self.ps_last_received_lsn.collect());
mfs.extend(self.feedback_last_time_seconds.collect());
mfs.extend(self.ps_feedback_count.collect());
mfs.extend(self.timeline_active.collect());
mfs.extend(self.wal_backup_active.collect());
mfs.extend(self.connected_computes.collect());

View File

@@ -36,11 +36,15 @@ use tokio::time::Instant;
use tracing::*;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
const DEFAULT_FEEDBACK_CAPACITY: usize = 8;
/// Registry of WalReceivers (compute connections). Timeline holds it (wrapped
/// in Arc).
pub struct WalReceivers {
mutex: Mutex<WalReceiversShared>,
pageserver_feedback_tx: tokio::sync::broadcast::Sender<PageserverFeedback>,
}
/// Id under which walreceiver is registered in shmem.
@@ -48,8 +52,12 @@ type WalReceiverId = usize;
impl WalReceivers {
pub fn new() -> Arc<WalReceivers> {
let (pageserver_feedback_tx, _) =
tokio::sync::broadcast::channel(DEFAULT_FEEDBACK_CAPACITY);
Arc::new(WalReceivers {
mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
pageserver_feedback_tx,
})
}
@@ -116,6 +124,12 @@ impl WalReceivers {
let mut shared = self.mutex.lock();
shared.slots[id] = None;
}
/// Broadcast pageserver feedback to connected walproposers.
pub fn broadcast_pageserver_feedback(&self, feedback: PageserverFeedback) {
// Err means there is no subscribers, it is fine.
let _ = self.pageserver_feedback_tx.send(feedback);
}
}
/// Only a few connections are expected (normally one), so store in Vec.
@@ -197,17 +211,28 @@ impl SafekeeperPostgresHandler {
// sends, so this avoids deadlocks.
let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
let peer_addr = *pgb.get_peer_addr();
let network_reader = NetworkReader {
let mut network_reader = NetworkReader {
ttid: self.ttid,
conn_id: self.conn_id,
pgb_reader: &mut pgb_reader,
peer_addr,
acceptor_handle: &mut acceptor_handle,
};
let res = tokio::select! {
// todo: add read|write .context to these errors
r = network_reader.run(msg_tx, msg_rx, reply_tx) => r,
r = network_write(pgb, reply_rx) => r,
// Read first message and create timeline if needed.
let res = network_reader.read_first_message().await;
let res = if let Ok((tli, next_msg)) = res {
let pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback> =
tli.get_walreceivers().pageserver_feedback_tx.subscribe();
tokio::select! {
// todo: add read|write .context to these errors
r = network_reader.run(msg_tx, msg_rx, reply_tx, tli.clone(), next_msg) => r,
r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r,
}
} else {
res.map(|_| ())
};
// Join pg backend back.
@@ -251,12 +276,9 @@ struct NetworkReader<'a, IO> {
}
impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
async fn run(
self,
msg_tx: Sender<ProposerAcceptorMessage>,
msg_rx: Receiver<ProposerAcceptorMessage>,
reply_tx: Sender<AcceptorProposerMessage>,
) -> Result<(), CopyStreamHandlerEnd> {
async fn read_first_message(
&mut self,
) -> Result<(Arc<Timeline>, ProposerAcceptorMessage), CopyStreamHandlerEnd> {
// Receive information about server to create timeline, if not yet.
let next_msg = read_message(self.pgb_reader).await?;
let tli = match next_msg {
@@ -278,9 +300,19 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
)))
}
};
Ok((tli, next_msg))
}
async fn run(
self,
msg_tx: Sender<ProposerAcceptorMessage>,
msg_rx: Receiver<ProposerAcceptorMessage>,
reply_tx: Sender<AcceptorProposerMessage>,
tli: Arc<Timeline>,
next_msg: ProposerAcceptorMessage,
) -> Result<(), CopyStreamHandlerEnd> {
*self.acceptor_handle = Some(WalAcceptor::spawn(
tli.clone(),
tli,
msg_rx,
reply_tx,
Some(self.conn_id),
@@ -320,18 +352,46 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
pgb_writer: &mut PostgresBackend<IO>,
mut reply_rx: Receiver<AcceptorProposerMessage>,
mut pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback>,
) -> Result<(), CopyStreamHandlerEnd> {
let mut buf = BytesMut::with_capacity(128);
// storing append_response to inject PageserverFeedback into it
let mut last_append_response = None;
loop {
match reply_rx.recv().await {
Some(msg) => {
buf.clear();
msg.serialize(&mut buf)?;
pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
// trying to read either AcceptorProposerMessage or PageserverFeedback
let msg = tokio::select! {
reply = reply_rx.recv() => {
if let Some(msg) = reply {
if let AcceptorProposerMessage::AppendResponse(append_response) = &msg {
last_append_response = Some(append_response.clone());
}
Some(msg)
} else {
return Ok(()); // chan closed, WalAcceptor terminated
}
}
None => return Ok(()), // chan closed, WalAcceptor terminated
}
feedback = pageserver_feedback_rx.recv() =>
match (feedback, &last_append_response) {
(Ok(feedback), Some(append_response)) => {
// clone AppendResponse and inject PageserverFeedback into it
let mut append_response = append_response.clone();
append_response.pageserver_feedback = Some(feedback);
Some(AcceptorProposerMessage::AppendResponse(append_response))
}
_ => None,
}
};
let Some(msg) = msg else {
continue;
};
buf.clear();
msg.serialize(&mut buf)?;
pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
}
}

View File

@@ -321,7 +321,7 @@ pub struct AppendRequestHeader {
}
/// Report safekeeper state to proposer
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone)]
pub struct AppendResponse {
// Current term of the safekeeper; if it is higher than proposer's, the
// compute is out of date.
@@ -334,7 +334,7 @@ pub struct AppendResponse {
// a criterion for walproposer --sync mode exit
pub commit_lsn: Lsn,
pub hs_feedback: HotStandbyFeedback,
pub pageserver_feedback: PageserverFeedback,
pub pageserver_feedback: Option<PageserverFeedback>,
}
impl AppendResponse {
@@ -344,7 +344,7 @@ impl AppendResponse {
flush_lsn: Lsn(0),
commit_lsn: Lsn(0),
hs_feedback: HotStandbyFeedback::empty(),
pageserver_feedback: PageserverFeedback::empty(),
pageserver_feedback: None,
}
}
}
@@ -462,7 +462,11 @@ impl AcceptorProposerMessage {
buf.put_u64_le(msg.hs_feedback.xmin);
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
msg.pageserver_feedback.serialize(buf);
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
// if it is not present.
if let Some(ref msg) = msg.pageserver_feedback {
msg.serialize(buf);
}
}
}
@@ -681,7 +685,7 @@ where
commit_lsn: self.state.commit_lsn,
// will be filled by the upper code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(),
pageserver_feedback: PageserverFeedback::empty(),
pageserver_feedback: None,
};
trace!("formed AppendResponse {:?}", ar);
ar

View File

@@ -2,6 +2,8 @@
//! with the "START_REPLICATION" message, and registry of walsenders.
use crate::handler::SafekeeperPostgresHandler;
use crate::metrics::RECEIVED_PS_FEEDBACKS;
use crate::receive_wal::WalReceivers;
use crate::safekeeper::{Term, TermLsn};
use crate::timeline::Timeline;
use crate::wal_service::ConnectionId;
@@ -21,7 +23,7 @@ use utils::failpoint_support;
use utils::id::TenantTimelineId;
use utils::pageserver_feedback::PageserverFeedback;
use std::cmp::{max, min};
use std::cmp::min;
use std::net::SocketAddr;
use std::str;
use std::sync::Arc;
@@ -90,12 +92,14 @@ pub struct StandbyFeedback {
/// WalSenders registry. Timeline holds it (wrapped in Arc).
pub struct WalSenders {
mutex: Mutex<WalSendersShared>,
walreceivers: Arc<WalReceivers>,
}
impl WalSenders {
pub fn new() -> Arc<WalSenders> {
pub fn new(walreceivers: Arc<WalReceivers>) -> Arc<WalSenders> {
Arc::new(WalSenders {
mutex: Mutex::new(WalSendersShared::new()),
walreceivers,
})
}
@@ -151,22 +155,29 @@ impl WalSenders {
.min()
}
/// Get aggregated pageserver feedback.
pub fn get_ps_feedback(self: &Arc<WalSenders>) -> PageserverFeedback {
self.mutex.lock().agg_ps_feedback
/// Returns total counter of pageserver feedbacks received and last feedback.
pub fn get_ps_feedback_stats(self: &Arc<WalSenders>) -> (u64, PageserverFeedback) {
let shared = self.mutex.lock();
(shared.ps_feedback_counter, shared.last_ps_feedback)
}
/// Get aggregated pageserver and hot standby feedback (we send them to compute).
pub fn get_feedbacks(self: &Arc<WalSenders>) -> (PageserverFeedback, HotStandbyFeedback) {
let shared = self.mutex.lock();
(shared.agg_ps_feedback, shared.agg_hs_feedback)
/// Get aggregated hot standby feedback (we send it to compute).
pub fn get_hotstandby(self: &Arc<WalSenders>) -> HotStandbyFeedback {
self.mutex.lock().agg_hs_feedback
}
/// Record new pageserver feedback, update aggregated values.
fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
let mut shared = self.mutex.lock();
shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
shared.update_ps_feedback();
shared.last_ps_feedback = *feedback;
shared.ps_feedback_counter += 1;
drop(shared);
RECEIVED_PS_FEEDBACKS.inc();
// send feedback to connected walproposers
self.walreceivers.broadcast_pageserver_feedback(*feedback);
}
/// Record standby reply.
@@ -222,8 +233,10 @@ impl WalSenders {
struct WalSendersShared {
// aggregated over all walsenders value
agg_hs_feedback: HotStandbyFeedback,
// aggregated over all walsenders value
agg_ps_feedback: PageserverFeedback,
// last feedback ever received from any pageserver, empty if none
last_ps_feedback: PageserverFeedback,
// total counter of pageserver feedbacks received
ps_feedback_counter: u64,
slots: Vec<Option<WalSenderState>>,
}
@@ -231,7 +244,8 @@ impl WalSendersShared {
fn new() -> Self {
WalSendersShared {
agg_hs_feedback: HotStandbyFeedback::empty(),
agg_ps_feedback: PageserverFeedback::empty(),
last_ps_feedback: PageserverFeedback::empty(),
ps_feedback_counter: 0,
slots: Vec::new(),
}
}
@@ -276,37 +290,6 @@ impl WalSendersShared {
}
self.agg_hs_feedback = agg;
}
/// Update aggregated pageserver feedback. LSNs (last_received,
/// disk_consistent, remote_consistent) and reply timestamp are just
/// maximized; timeline_size if taken from feedback with highest
/// last_received lsn. This is generally reasonable, but we might want to
/// implement other policies once multiple pageservers start to be actively
/// used.
fn update_ps_feedback(&mut self) {
let init = PageserverFeedback::empty();
let acc =
self.slots
.iter()
.flatten()
.fold(init, |mut acc, ws_state| match ws_state.feedback {
ReplicationFeedback::Pageserver(feedback) => {
if feedback.last_received_lsn > acc.last_received_lsn {
acc.current_timeline_size = feedback.current_timeline_size;
}
acc.last_received_lsn =
max(feedback.last_received_lsn, acc.last_received_lsn);
acc.disk_consistent_lsn =
max(feedback.disk_consistent_lsn, acc.disk_consistent_lsn);
acc.remote_consistent_lsn =
max(feedback.remote_consistent_lsn, acc.remote_consistent_lsn);
acc.replytime = max(feedback.replytime, acc.replytime);
acc
}
ReplicationFeedback::Standby(_) => acc,
});
self.agg_ps_feedback = acc;
}
}
// Serialized is used only for pretty printing in json.
@@ -443,7 +426,7 @@ impl SafekeeperPostgresHandler {
};
let mut reply_reader = ReplyReader {
reader,
ws_guard,
ws_guard: ws_guard.clone(),
tli,
};
@@ -452,6 +435,18 @@ impl SafekeeperPostgresHandler {
r = sender.run() => r,
r = reply_reader.run() => r,
};
let ws_state = ws_guard
.walsenders
.mutex
.lock()
.get_slot(ws_guard.id)
.clone();
info!(
"finished streaming to {}, feedback={:?}",
ws_state.addr, ws_state.feedback,
);
// Join pg backend back.
pgb.unsplit(reply_reader.reader)?;
@@ -733,7 +728,6 @@ async fn wait_for_lsn(
#[cfg(test)]
mod tests {
use postgres_protocol::PG_EPOCH;
use utils::id::{TenantId, TimelineId};
use super::*;
@@ -792,27 +786,4 @@ mod tests {
wss.update_hs_feedback();
assert_eq!(wss.agg_hs_feedback.xmin, 42);
}
// form pageserver feedback with given last_record_lsn / tli size and the
// rest set to dummy values.
fn ps_feedback(current_timeline_size: u64, last_received_lsn: Lsn) -> ReplicationFeedback {
ReplicationFeedback::Pageserver(PageserverFeedback {
current_timeline_size,
last_received_lsn,
disk_consistent_lsn: Lsn::INVALID,
remote_consistent_lsn: Lsn::INVALID,
replytime: *PG_EPOCH,
})
}
// test that ps aggregation works as expected
#[test]
fn test_ps_feedback() {
let mut wss = WalSendersShared::new();
push_feedback(&mut wss, ps_feedback(8, Lsn(42)));
push_feedback(&mut wss, ps_feedback(4, Lsn(84)));
wss.update_ps_feedback();
assert_eq!(wss.agg_ps_feedback.current_timeline_size, 4);
assert_eq!(wss.agg_ps_feedback.last_received_lsn, Lsn(84));
}
}

View File

@@ -402,6 +402,7 @@ impl Timeline {
)));
let (cancellation_tx, cancellation_rx) = watch::channel(false);
let walreceivers = WalReceivers::new();
Ok(Timeline {
ttid,
wal_backup_launcher_tx,
@@ -410,8 +411,8 @@ impl Timeline {
term_flush_lsn_watch_tx,
term_flush_lsn_watch_rx,
mutex: Mutex::new(shared_state),
walsenders: WalSenders::new(),
walreceivers: WalReceivers::new(),
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
cancellation_rx,
cancellation_tx,
timeline_dir: conf.timeline_dir(&ttid),
@@ -435,6 +436,7 @@ impl Timeline {
let state =
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
let walreceivers = WalReceivers::new();
Ok(Timeline {
ttid,
wal_backup_launcher_tx,
@@ -443,8 +445,8 @@ impl Timeline {
term_flush_lsn_watch_tx,
term_flush_lsn_watch_rx,
mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
walsenders: WalSenders::new(),
walreceivers: WalReceivers::new(),
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
cancellation_rx,
cancellation_tx,
timeline_dir: conf.timeline_dir(&ttid),
@@ -656,12 +658,9 @@ impl Timeline {
let mut shared_state = self.write_shared_state().await;
rmsg = shared_state.sk.process_msg(msg).await?;
// if this is AppendResponse, fill in proper pageserver and hot
// standby feedback.
// if this is AppendResponse, fill in proper hot standby feedback.
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
let (ps_feedback, hs_feedback) = self.walsenders.get_feedbacks();
resp.hs_feedback = hs_feedback;
resp.pageserver_feedback = ps_feedback;
resp.hs_feedback = self.walsenders.get_hotstandby();
}
commit_lsn = shared_state.sk.state.inmem.commit_lsn;
@@ -898,12 +897,13 @@ impl Timeline {
return None;
}
let ps_feedback = self.walsenders.get_ps_feedback();
let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
let state = self.write_shared_state().await;
if state.active {
Some(FullTimelineInfo {
ttid: self.ttid,
ps_feedback,
ps_feedback_count,
last_ps_feedback,
wal_backup_active: state.wal_backup_active,
timeline_is_active: state.active,
num_computes: self.walreceivers.get_num() as u32,

View File

@@ -519,9 +519,9 @@ class NeonEnvBuilder:
self.env = NeonEnv(self)
return self.env
def start(self):
def start(self, register_pageservers=False):
assert self.env is not None, "environment is not already initialized, call init() first"
self.env.start()
self.env.start(register_pageservers=register_pageservers)
def init_start(
self,
@@ -1014,24 +1014,24 @@ class NeonEnv:
self.initial_tenant = config.initial_tenant
self.initial_timeline = config.initial_timeline
# Find two adjacent ports for attachment service and its postgres DB. This
# Find two adjacent ports for storage controller and its postgres DB. This
# loop would eventually throw from get_port() if we run out of ports (extremely
# unlikely): usually we find two adjacent free ports on the first iteration.
while True:
self.attachment_service_port = self.port_distributor.get_port()
attachment_service_pg_port = self.port_distributor.get_port()
if attachment_service_pg_port == self.attachment_service_port + 1:
self.storage_controller_port = self.port_distributor.get_port()
storage_controller_pg_port = self.port_distributor.get_port()
if storage_controller_pg_port == self.storage_controller_port + 1:
break
# The URL for the pageserver to use as its control_plane_api config
self.control_plane_api: str = f"http://127.0.0.1:{self.attachment_service_port}/upcall/v1"
# The base URL of the attachment service
self.attachment_service_api: str = f"http://127.0.0.1:{self.attachment_service_port}"
self.control_plane_api: str = f"http://127.0.0.1:{self.storage_controller_port}/upcall/v1"
# The base URL of the storage controller
self.storage_controller_api: str = f"http://127.0.0.1:{self.storage_controller_port}"
# For testing this with a fake HTTP server, enable passing through a URL from config
self.control_plane_compute_hook_api = config.control_plane_compute_hook_api
self.attachment_service: NeonAttachmentService = NeonAttachmentService(
self.storage_controller: NeonStorageController = NeonStorageController(
self, config.auth_enabled
)
@@ -1112,17 +1112,22 @@ class NeonEnv:
log.info(f"Config: {cfg}")
self.neon_cli.init(cfg, force=config.config_init_force)
def start(self):
# Attachment service starts first, so that pageserver /re-attach calls don't
def start(self, register_pageservers=False):
# storage controller starts first, so that pageserver /re-attach calls don't
# bounce through retries on startup
self.attachment_service.start()
self.storage_controller.start()
def attachment_service_ready():
assert self.attachment_service.ready() is True
def storage_controller_ready():
assert self.storage_controller.ready() is True
# Wait for attachment service readiness to prevent unnecessary post start-up
# Wait for storage controller readiness to prevent unnecessary post start-up
# reconcile.
wait_until(30, 1, attachment_service_ready)
wait_until(30, 1, storage_controller_ready)
if register_pageservers:
# Special case for forward compat tests, this can be removed later.
for pageserver in self.pageservers:
self.storage_controller.node_register(pageserver)
# Start up broker, pageserver and all safekeepers
futs = []
@@ -1153,7 +1158,7 @@ class NeonEnv:
if ps_assert_metric_no_errors:
pageserver.assert_no_metric_errors()
pageserver.stop(immediate=immediate)
self.attachment_service.stop(immediate=immediate)
self.storage_controller.stop(immediate=immediate)
self.broker.stop(immediate=immediate)
@property
@@ -1188,9 +1193,9 @@ class NeonEnv:
def get_tenant_pageserver(self, tenant_id: Union[TenantId, TenantShardId]):
"""
Get the NeonPageserver where this tenant shard is currently attached, according
to the attachment service.
to the storage controller.
"""
meta = self.attachment_service.inspect(tenant_id)
meta = self.storage_controller.inspect(tenant_id)
if meta is None:
return None
pageserver_id = meta[1]
@@ -1697,12 +1702,12 @@ class NeonCli(AbstractNeonCli):
res.check_returncode()
return res
def attachment_service_start(self):
cmd = ["attachment_service", "start"]
def storage_controller_start(self):
cmd = ["storage_controller", "start"]
return self.raw_cli(cmd)
def attachment_service_stop(self, immediate: bool):
cmd = ["attachment_service", "stop"]
def storage_controller_stop(self, immediate: bool):
cmd = ["storage_controller", "stop"]
if immediate:
cmd.extend(["-m", "immediate"])
return self.raw_cli(cmd)
@@ -1712,10 +1717,8 @@ class NeonCli(AbstractNeonCli):
id: int,
overrides: Tuple[str, ...] = (),
extra_env_vars: Optional[Dict[str, str]] = None,
register: bool = True,
) -> "subprocess.CompletedProcess[str]":
register_str = "true" if register else "false"
start_args = ["pageserver", "start", f"--id={id}", *overrides, f"--register={register_str}"]
start_args = ["pageserver", "start", f"--id={id}", *overrides]
storage = self.env.pageserver_remote_storage
append_pageserver_param_overrides(
params_to_update=start_args,
@@ -1942,14 +1945,14 @@ class Pagectl(AbstractNeonCli):
return IndexPartDump.from_json(parsed)
class AttachmentServiceApiException(Exception):
class StorageControllerApiException(Exception):
def __init__(self, message, status_code: int):
super().__init__(message)
self.message = message
self.status_code = status_code
class NeonAttachmentService(MetricsGetter):
class NeonStorageController(MetricsGetter):
def __init__(self, env: NeonEnv, auth_enabled: bool):
self.env = env
self.running = False
@@ -1957,13 +1960,13 @@ class NeonAttachmentService(MetricsGetter):
def start(self):
assert not self.running
self.env.neon_cli.attachment_service_start()
self.env.neon_cli.storage_controller_start()
self.running = True
return self
def stop(self, immediate: bool = False) -> "NeonAttachmentService":
def stop(self, immediate: bool = False) -> "NeonStorageController":
if self.running:
self.env.neon_cli.attachment_service_stop(immediate)
self.env.neon_cli.storage_controller_stop(immediate)
self.running = False
return self
@@ -1976,22 +1979,22 @@ class NeonAttachmentService(MetricsGetter):
msg = res.json()["msg"]
except: # noqa: E722
msg = ""
raise AttachmentServiceApiException(msg, res.status_code) from e
raise StorageControllerApiException(msg, res.status_code) from e
def pageserver_api(self) -> PageserverHttpClient:
"""
The attachment service implements a subset of the pageserver REST API, for mapping
The storage controller implements a subset of the pageserver REST API, for mapping
per-tenant actions into per-shard actions (e.g. timeline creation). Tests should invoke those
functions via the HttpClient, as an implicit check that these APIs remain compatible.
"""
auth_token = None
if self.auth_enabled:
auth_token = self.env.auth_keys.generate_token(scope=TokenScope.PAGE_SERVER_API)
return PageserverHttpClient(self.env.attachment_service_port, lambda: True, auth_token)
return PageserverHttpClient(self.env.storage_controller_port, lambda: True, auth_token)
def request(self, method, *args, **kwargs) -> requests.Response:
resp = requests.request(method, *args, **kwargs)
NeonAttachmentService.raise_api_exception(resp)
NeonStorageController.raise_api_exception(resp)
return resp
@@ -2004,15 +2007,15 @@ class NeonAttachmentService(MetricsGetter):
return headers
def get_metrics(self) -> Metrics:
res = self.request("GET", f"{self.env.attachment_service_api}/metrics")
res = self.request("GET", f"{self.env.storage_controller_api}/metrics")
return parse_metrics(res.text)
def ready(self) -> bool:
status = None
try:
resp = self.request("GET", f"{self.env.attachment_service_api}/ready")
resp = self.request("GET", f"{self.env.storage_controller_api}/ready")
status = resp.status_code
except AttachmentServiceApiException as e:
except StorageControllerApiException as e:
status = e.status_code
if status == 503:
@@ -2027,7 +2030,7 @@ class NeonAttachmentService(MetricsGetter):
) -> int:
response = self.request(
"POST",
f"{self.env.attachment_service_api}/debug/v1/attach-hook",
f"{self.env.storage_controller_api}/debug/v1/attach-hook",
json={"tenant_shard_id": str(tenant_shard_id), "node_id": pageserver_id},
headers=self.headers(TokenScope.ADMIN),
)
@@ -2038,7 +2041,7 @@ class NeonAttachmentService(MetricsGetter):
def attach_hook_drop(self, tenant_shard_id: Union[TenantId, TenantShardId]):
self.request(
"POST",
f"{self.env.attachment_service_api}/debug/v1/attach-hook",
f"{self.env.storage_controller_api}/debug/v1/attach-hook",
json={"tenant_shard_id": str(tenant_shard_id), "node_id": None},
headers=self.headers(TokenScope.ADMIN),
)
@@ -2049,7 +2052,7 @@ class NeonAttachmentService(MetricsGetter):
"""
response = self.request(
"POST",
f"{self.env.attachment_service_api}/debug/v1/inspect",
f"{self.env.storage_controller_api}/debug/v1/inspect",
json={"tenant_shard_id": str(tenant_shard_id)},
headers=self.headers(TokenScope.ADMIN),
)
@@ -2066,11 +2069,13 @@ class NeonAttachmentService(MetricsGetter):
"node_id": int(node.id),
"listen_http_addr": "localhost",
"listen_http_port": node.service_port.http,
"listen_pg_addr": "localhost",
"listen_pg_port": node.service_port.pg,
}
log.info(f"node_register({body})")
self.request(
"POST",
f"{self.env.attachment_service_api}/control/v1/node",
f"{self.env.storage_controller_api}/control/v1/node",
json=body,
headers=self.headers(TokenScope.ADMIN),
)
@@ -2078,7 +2083,7 @@ class NeonAttachmentService(MetricsGetter):
def node_list(self):
response = self.request(
"GET",
f"{self.env.attachment_service_api}/control/v1/node",
f"{self.env.storage_controller_api}/control/v1/node",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
@@ -2088,7 +2093,7 @@ class NeonAttachmentService(MetricsGetter):
body["node_id"] = node_id
self.request(
"PUT",
f"{self.env.attachment_service_api}/control/v1/node/{node_id}/config",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/config",
json=body,
headers=self.headers(TokenScope.ADMIN),
)
@@ -2118,7 +2123,7 @@ class NeonAttachmentService(MetricsGetter):
response = self.request(
"POST",
f"{self.env.attachment_service_api}/v1/tenant",
f"{self.env.storage_controller_api}/v1/tenant",
json=body,
headers=self.headers(TokenScope.PAGE_SERVER_API),
)
@@ -2130,18 +2135,20 @@ class NeonAttachmentService(MetricsGetter):
"""
response = self.request(
"GET",
f"{self.env.attachment_service_api}/control/v1/tenant/{tenant_id}/locate",
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/locate",
headers=self.headers(TokenScope.ADMIN),
)
body = response.json()
shards: list[dict[str, Any]] = body["shards"]
return shards
def tenant_shard_split(self, tenant_id: TenantId, shard_count: int) -> list[TenantShardId]:
def tenant_shard_split(
self, tenant_id: TenantId, shard_count: int, shard_stripe_size: Optional[int] = None
) -> list[TenantShardId]:
response = self.request(
"PUT",
f"{self.env.attachment_service_api}/control/v1/tenant/{tenant_id}/shard_split",
json={"new_shard_count": shard_count},
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/shard_split",
json={"new_shard_count": shard_count, "new_stripe_size": shard_stripe_size},
headers=self.headers(TokenScope.ADMIN),
)
body = response.json()
@@ -2152,7 +2159,7 @@ class NeonAttachmentService(MetricsGetter):
def tenant_shard_migrate(self, tenant_shard_id: TenantShardId, dest_ps_id: int):
self.request(
"PUT",
f"{self.env.attachment_service_api}/control/v1/tenant/{tenant_shard_id}/migrate",
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_shard_id}/migrate",
json={"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id},
headers=self.headers(TokenScope.ADMIN),
)
@@ -2165,12 +2172,12 @@ class NeonAttachmentService(MetricsGetter):
"""
self.request(
"POST",
f"{self.env.attachment_service_api}/debug/v1/consistency_check",
f"{self.env.storage_controller_api}/debug/v1/consistency_check",
headers=self.headers(TokenScope.ADMIN),
)
log.info("Attachment service passed consistency check")
log.info("storage controller passed consistency check")
def __enter__(self) -> "NeonAttachmentService":
def __enter__(self) -> "NeonStorageController":
return self
def __exit__(
@@ -2233,7 +2240,6 @@ class NeonPageserver(PgProtocol):
self,
overrides: Tuple[str, ...] = (),
extra_env_vars: Optional[Dict[str, str]] = None,
register: bool = True,
) -> "NeonPageserver":
"""
Start the page server.
@@ -2243,7 +2249,7 @@ class NeonPageserver(PgProtocol):
assert self.running is False
self.env.neon_cli.pageserver_start(
self.id, overrides=overrides, extra_env_vars=extra_env_vars, register=register
self.id, overrides=overrides, extra_env_vars=extra_env_vars
)
self.running = True
return self
@@ -2401,7 +2407,7 @@ class NeonPageserver(PgProtocol):
"""
client = self.http_client()
if generation is None:
generation = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
generation = self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
return client.tenant_attach(
tenant_id,
config,
@@ -2410,14 +2416,14 @@ class NeonPageserver(PgProtocol):
)
def tenant_detach(self, tenant_id: TenantId):
self.env.attachment_service.attach_hook_drop(tenant_id)
self.env.storage_controller.attach_hook_drop(tenant_id)
client = self.http_client()
return client.tenant_detach(tenant_id)
def tenant_location_configure(self, tenant_id: TenantId, config: dict[str, Any], **kwargs):
if config["mode"].startswith("Attached") and "generation" not in config:
config["generation"] = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
config["generation"] = self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
client = self.http_client()
return client.tenant_location_conf(tenant_id, config, **kwargs)
@@ -2441,14 +2447,14 @@ class NeonPageserver(PgProtocol):
generation: Optional[int] = None,
) -> TenantId:
if generation is None:
generation = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
generation = self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
client = self.http_client(auth_token=auth_token)
return client.tenant_create(tenant_id, conf, generation=generation)
def tenant_load(self, tenant_id: TenantId):
client = self.http_client()
return client.tenant_load(
tenant_id, generation=self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
tenant_id, generation=self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
)
@@ -2859,6 +2865,7 @@ class NeonProxy(PgProtocol):
self.auth_backend = auth_backend
self.metric_collection_endpoint = metric_collection_endpoint
self.metric_collection_interval = metric_collection_interval
self.http_timeout_seconds = 15
self._popen: Optional[subprocess.Popen[bytes]] = None
def start(self) -> NeonProxy:
@@ -2897,6 +2904,7 @@ class NeonProxy(PgProtocol):
*["--proxy", f"{self.host}:{self.proxy_port}"],
*["--mgmt", f"{self.host}:{self.mgmt_port}"],
*["--wss", f"{self.host}:{self.external_http_port}"],
*["--sql-over-http-timeout", f"{self.http_timeout_seconds}s"],
*["-c", str(crt_path)],
*["-k", str(key_path)],
*self.auth_backend.extra_args(),
@@ -2937,6 +2945,8 @@ class NeonProxy(PgProtocol):
password = quote(kwargs["password"])
expected_code = kwargs.get("expected_code")
log.info(f"Executing http query: {query}")
connstr = f"postgresql://{user}:{password}@{self.domain}:{self.proxy_port}/postgres"
response = requests.post(
f"https://{self.domain}:{self.external_http_port}/sql",
@@ -2959,6 +2969,8 @@ class NeonProxy(PgProtocol):
password = kwargs["password"]
expected_code = kwargs.get("expected_code")
log.info(f"Executing http2 query: {query}")
connstr = f"postgresql://{user}:{password}@{self.domain}:{self.proxy_port}/postgres"
async with httpx.AsyncClient(
http2=True, verify=str(self.test_output_dir / "proxy.crt")
@@ -3907,7 +3919,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
psql_path = os.path.join(pg_bin.pg_bin_path, "psql")
pageserver_id = env.attachment_service.locate(endpoint.tenant_id)[0]["node_id"]
pageserver_id = env.storage_controller.locate(endpoint.tenant_id)[0]["node_id"]
cmd = rf"""
{psql_path} \
--no-psqlrc \
@@ -3971,18 +3983,27 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) -> Lsn:
"""Wait logical replication subscriber to sync with publisher."""
publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
while True:
def is_synced(publisher_lsn):
# Even if pg_stat_subscription.latest_end_lsn is caughtup, some tables
# might not be synced because until sync worker finishes main apply
# continues to advance.
rels_synced = subscriber.safe_psql(
"select count(*) = 0 from pg_subscription_rel where srsubstate != 'r'"
)[0][0]
log.info(f"number of not synced rels: {rels_synced}")
assert rels_synced
res = subscriber.safe_psql("select latest_end_lsn from pg_catalog.pg_stat_subscription")[0][
0
]
if res:
log.info(f"subscriber_lsn={res}")
subscriber_lsn = Lsn(res)
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={ publisher_lsn}")
if subscriber_lsn >= publisher_lsn:
return subscriber_lsn
time.sleep(0.5)
log.info(f"subscriber_lsn={res}")
subscriber_lsn = Lsn(res)
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={ publisher_lsn}")
assert subscriber_lsn >= publisher_lsn
publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
wait_until(30, 0.5, partial(is_synced, publisher_lsn))
return publisher_lsn
def tenant_get_shards(
@@ -3994,7 +4015,7 @@ def tenant_get_shards(
us to figure out the shards for a tenant.
If the caller provides `pageserver_id`, it will be used for all shards, even
if the shard is indicated by attachment service to be on some other pageserver.
if the shard is indicated by storage controller to be on some other pageserver.
Caller should over the response to apply their per-pageserver action to
each shard
@@ -4010,7 +4031,7 @@ def tenant_get_shards(
TenantShardId.parse(s["shard_id"]),
override_pageserver or env.get_pageserver(s["node_id"]),
)
for s in env.attachment_service.locate(tenant_id)
for s in env.storage_controller.locate(tenant_id)
]
else:
# Assume an unsharded tenant

View File

@@ -318,6 +318,13 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
assert isinstance(res_json["tenant_shards"], list)
return res_json
def tenant_get_location(self, tenant_id: TenantShardId):
res = self.get(
f"http://localhost:{self.port}/v1/location_config/{tenant_id}",
)
self.verbose_error(res)
return res.json()
def tenant_delete(self, tenant_id: Union[TenantId, TenantShardId]):
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
self.verbose_error(res)

View File

@@ -43,7 +43,7 @@ def single_timeline(
log.info("detach template tenant form pageserver")
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
# tenant detach causes this because the underlying attach-hook removes the tenant from storage controller entirely
".*Dropped remote consistent LSN updates.*",
)

View File

@@ -56,7 +56,7 @@ def setup_env(
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
# tenant detach causes this because the underlying attach-hook removes the tenant from storage controller entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)

View File

@@ -92,7 +92,7 @@ def setup_tenant_template(env: NeonEnv, n_txns: int):
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
# tenant detach causes this because the underlying attach-hook removes the tenant from storage controller entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)

View File

@@ -114,7 +114,7 @@ def setup_tenant_template(env: NeonEnv, pg_bin: PgBin, scale: int):
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
# tenant detach causes this because the underlying attach-hook removes the tenant from storage controller entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)

View File

@@ -56,12 +56,12 @@ def measure_recovery_time(env: NeonCompare):
# Delete the Tenant in the pageserver: this will drop local and remote layers, such that
# when we "create" the Tenant again, we will replay the WAL from the beginning.
#
# This is a "weird" thing to do, and can confuse the attachment service as we're re-using
# This is a "weird" thing to do, and can confuse the storage controller as we're re-using
# the same tenant ID for a tenant that is logically different from the pageserver's point
# of view, but the same as far as the safekeeper/WAL is concerned. To work around that,
# we will explicitly create the tenant in the same generation that it was previously
# attached in.
attach_status = env.env.attachment_service.inspect(tenant_shard_id=env.tenant)
attach_status = env.env.storage_controller.inspect(tenant_shard_id=env.tenant)
assert attach_status is not None
(attach_gen, _) = attach_status

View File

@@ -137,7 +137,7 @@ def test_no_config(positive_env: NeonEnv, content_type: Optional[str]):
ps_http.tenant_detach(tenant_id)
assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()]
body = {"generation": env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)}
body = {"generation": env.storage_controller.attach_hook_issue(tenant_id, env.pageserver.id)}
ps_http.post(
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",

View File

@@ -85,9 +85,9 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
# the endpoint. Whereas the previous reconfiguration was like a healthy migration, this
# is more like what happens in an unexpected pageserver failure.
#
# Since we're dual-attached, need to tip-off attachment service to treat the one we're
# Since we're dual-attached, need to tip-off storage controller to treat the one we're
# about to start as the attached pageserver
env.attachment_service.attach_hook_issue(env.initial_tenant, env.pageservers[0].id)
env.storage_controller.attach_hook_issue(env.initial_tenant, env.pageservers[0].id)
env.pageservers[0].start()
env.pageservers[1].stop()
@@ -97,9 +97,9 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
assert fetchone() == (100000,)
env.pageservers[0].stop()
# Since we're dual-attached, need to tip-off attachment service to treat the one we're
# Since we're dual-attached, need to tip-off storage controller to treat the one we're
# about to start as the attached pageserver
env.attachment_service.attach_hook_issue(env.initial_tenant, env.pageservers[1].id)
env.storage_controller.attach_hook_issue(env.initial_tenant, env.pageservers[1].id)
env.pageservers[1].start()
# Test a (former) bug where a child process spins without updating its connection string

View File

@@ -133,7 +133,7 @@ def test_create_snapshot(
for sk in env.safekeepers:
sk.stop()
env.pageserver.stop()
env.attachment_service.stop()
env.storage_controller.stop()
# Directory `compatibility_snapshot_dir` is uploaded to S3 in a workflow, keep the name in sync with it
compatibility_snapshot_dir = (
@@ -242,7 +242,7 @@ def test_forward_compatibility(
# everything else: our test code is written for latest CLI args.
env.neon_local_binpath = neon_local_binpath
neon_env_builder.start()
neon_env_builder.start(register_pageservers=True)
check_neon_works(
env,

View File

@@ -159,7 +159,7 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
time.sleep(1.1) # so that we can use change in pre_stat.st_mtime to detect overwrites
def get_generation_number():
attachment = env.attachment_service.inspect(tenant_id)
attachment = env.storage_controller.inspect(tenant_id)
assert attachment is not None
return attachment[0]

View File

@@ -133,7 +133,7 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder):
# Stop default ps/sk
env.neon_cli.pageserver_stop(env.pageserver.id)
env.neon_cli.safekeeper_stop()
env.neon_cli.attachment_service_stop(False)
env.neon_cli.storage_controller_stop(False)
# Keep NeonEnv state up to date, it usually owns starting/stopping services
env.pageserver.running = False
@@ -175,7 +175,7 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder):
env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 2)
# Stop this to get out of the way of the following `start`
env.neon_cli.attachment_service_stop(False)
env.neon_cli.storage_controller_stop(False)
# Default start
res = env.neon_cli.raw_cli(["start"])

View File

@@ -73,7 +73,7 @@ def check_client(env: NeonEnv, client: PageserverHttpClient):
# create new tenant and check it is also there
tenant_id = TenantId.generate()
client.tenant_create(
tenant_id, generation=env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)
tenant_id, generation=env.storage_controller.attach_hook_issue(tenant_id, env.pageserver.id)
)
assert tenant_id in {TenantId(t["id"]) for t in client.tenant_list()}

View File

@@ -203,7 +203,10 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
env.broker.try_start()
for sk in env.safekeepers:
sk.start()
env.attachment_service.start()
env.storage_controller.start()
# We will start a pageserver with no control_plane_api set, so it won't be able to self-register
env.storage_controller.node_register(env.pageserver)
env.pageserver.start(overrides=('--pageserver-config-override=control_plane_api=""',))
@@ -285,7 +288,7 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
attached_to_id = env.attachment_service.locate(env.initial_tenant)[0]["node_id"]
attached_to_id = env.storage_controller.locate(env.initial_tenant)[0]["node_id"]
main_pageserver = env.get_pageserver(attached_to_id)
other_pageserver = [p for p in env.pageservers if p.id != attached_to_id][0]
@@ -310,7 +313,7 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
# Now advance the generation in the control plane: subsequent validations
# from the running pageserver will fail. No more deletions should happen.
env.attachment_service.attach_hook_issue(env.initial_tenant, other_pageserver.id)
env.storage_controller.attach_hook_issue(env.initial_tenant, other_pageserver.id)
generate_uploads_and_deletions(env, init=False, pageserver=main_pageserver)
assert_deletion_queue(ps_http, lambda n: n > 0)
@@ -366,7 +369,7 @@ def test_deletion_queue_recovery(
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
attached_to_id = env.attachment_service.locate(env.initial_tenant)[0]["node_id"]
attached_to_id = env.storage_controller.locate(env.initial_tenant)[0]["node_id"]
main_pageserver = env.get_pageserver(attached_to_id)
other_pageserver = [p for p in env.pageservers if p.id != attached_to_id][0]
@@ -428,7 +431,7 @@ def test_deletion_queue_recovery(
if keep_attachment == KeepAttachment.LOSE:
some_other_pageserver = other_pageserver.id
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
env.storage_controller.attach_hook_issue(env.initial_tenant, some_other_pageserver)
main_pageserver.start()
@@ -494,7 +497,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
)
# Simulate a major incident: the control plane goes offline
env.attachment_service.stop()
env.storage_controller.stop()
# Remember how many validations had happened before the control plane went offline
validated = get_deletion_queue_validated(ps_http)
@@ -511,7 +514,6 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP
env.pageserver.start(
overrides=("--pageserver-config-override=control_plane_emergency_mode=true",),
register=False,
)
# The pageserver should provide service to clients
@@ -525,7 +527,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
assert get_deletion_queue_executed(ps_http) == 0
# When the control plane comes back up, normal service should resume
env.attachment_service.start()
env.storage_controller.start()
ps_http.deletion_queue_flush(execute=True)
assert get_deletion_queue_depth(ps_http) == 0

View File

@@ -157,7 +157,7 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
workload.churn_rows(rng.randint(128, 256), pageserver.id)
workload.validate(pageserver.id)
elif last_state_ps[0].startswith("Attached"):
# The `attachment_service` will only re-attach on startup when a pageserver was the
# The `storage_controller` will only re-attach on startup when a pageserver was the
# holder of the latest generation: otherwise the pageserver will revert to detached
# state if it was running attached with a stale generation
last_state[pageserver.id] = ("Detached", None)
@@ -182,12 +182,12 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
generation = last_state_ps[1]
else:
# Switch generations, while also jumping between attached states
generation = env.attachment_service.attach_hook_issue(
generation = env.storage_controller.attach_hook_issue(
tenant_id, pageserver.id
)
latest_attached = pageserver.id
else:
generation = env.attachment_service.attach_hook_issue(tenant_id, pageserver.id)
generation = env.storage_controller.attach_hook_issue(tenant_id, pageserver.id)
latest_attached = pageserver.id
else:
generation = None
@@ -273,7 +273,7 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
# Encourage the new location to download while still in secondary mode
pageserver_b.http_client().tenant_secondary_download(tenant_id)
migrated_generation = env.attachment_service.attach_hook_issue(tenant_id, pageserver_b.id)
migrated_generation = env.storage_controller.attach_hook_issue(tenant_id, pageserver_b.id)
log.info(f"Acquired generation {migrated_generation} for destination pageserver")
assert migrated_generation == initial_generation + 1
@@ -436,7 +436,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
remote_storage_kind=RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
assert env.attachment_service is not None
assert env.storage_controller is not None
assert isinstance(env.pageserver_remote_storage, S3Storage) # Satisfy linter
tenant_id = env.initial_tenant

View File

@@ -564,3 +564,35 @@ async def test_sql_over_http2(static_proxy: NeonProxy):
"select 42 as answer", [], user="http", password="http", expected_code=200
)
assert resp["rows"] == [{"answer": 42}]
def test_sql_over_http_timeout_cancel(static_proxy: NeonProxy):
static_proxy.safe_psql("create role http with login password 'http' superuser")
static_proxy.safe_psql("create table test_table ( id int primary key )")
# insert into a table, with a unique constraint, after sleeping for n seconds
query = "WITH temp AS ( \
SELECT pg_sleep($1) as sleep, $2::int as id \
) INSERT INTO test_table (id) SELECT id FROM temp"
# expect to fail with timeout
res = static_proxy.http_query(
query,
[static_proxy.http_timeout_seconds + 1, 1],
user="http",
password="http",
expected_code=400,
)
assert "Query cancelled, runtime exceeded" in res["message"], "HTTP query should time out"
time.sleep(2)
res = static_proxy.http_query(query, [1, 1], user="http", password="http", expected_code=200)
assert res["command"] == "INSERT", "HTTP query should insert"
assert res["rowCount"] == 1, "HTTP query should insert"
res = static_proxy.http_query(query, [0, 1], user="http", password="http", expected_code=400)
assert (
"duplicate key value violates unique constraint" in res["message"]
), "HTTP query should conflict"

View File

@@ -169,7 +169,7 @@ def test_remote_storage_backup_and_restore(
# Ensure that even though the tenant is broken, retrying the attachment fails
with pytest.raises(Exception, match="Tenant state is Broken"):
# Use same generation as in previous attempt
gen_state = env.attachment_service.inspect(tenant_id)
gen_state = env.storage_controller.inspect(tenant_id)
assert gen_state is not None
generation = gen_state[0]
env.pageserver.tenant_attach(tenant_id, generation=generation)
@@ -355,7 +355,7 @@ def test_remote_storage_upload_queue_retries(
env.pageserver.stop(immediate=True)
env.endpoints.stop_all()
# We are about to forcibly drop local dirs. Attachment service will increment generation in re-attach before
# We are about to forcibly drop local dirs. Storage controller will increment generation in re-attach before
# we later increment when actually attaching it again, leading to skipping a generation and potentially getting
# these warnings if there was a durable but un-executed deletion list at time of restart.
env.pageserver.allowed_errors.extend(

View File

@@ -80,7 +80,7 @@ def test_tenant_s3_restore(
assert (
ps_http.get_metric_value("pageserver_tenant_manager_slots") == 0
), "tenant removed before we deletion was issued"
env.attachment_service.attach_hook_drop(tenant_id)
env.storage_controller.attach_hook_drop(tenant_id)
tenant_path = env.pageserver.tenant_dir(tenant_id)
assert not tenant_path.exists()
@@ -103,7 +103,7 @@ def test_tenant_s3_restore(
tenant_id, timestamp=ts_before_deletion, done_if_after=ts_after_deletion
)
generation = env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)
generation = env.storage_controller.attach_hook_issue(tenant_id, env.pageserver.id)
ps_http.tenant_attach(tenant_id, generation=generation)
env.pageserver.quiesce_tenants()

View File

@@ -1,4 +1,5 @@
import os
from typing import Dict, List, Union
import pytest
from fixtures.log_helper import log
@@ -8,7 +9,11 @@ from fixtures.neon_fixtures import (
)
from fixtures.remote_storage import s3_storage
from fixtures.types import Lsn, TenantShardId, TimelineId
from fixtures.utils import wait_until
from fixtures.workload import Workload
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
def test_sharding_smoke(
@@ -43,7 +48,7 @@ def test_sharding_smoke(
tenant_id = env.initial_tenant
pageservers = dict((int(p.id), p) for p in env.pageservers)
shards = env.attachment_service.locate(tenant_id)
shards = env.storage_controller.locate(tenant_id)
def get_sizes():
sizes = {}
@@ -86,7 +91,7 @@ def test_sharding_smoke(
)
assert timelines == {env.initial_timeline, timeline_b}
env.attachment_service.consistency_check()
env.storage_controller.consistency_check()
def test_sharding_split_unsharded(
@@ -102,7 +107,7 @@ def test_sharding_split_unsharded(
# Check that we created with an unsharded TenantShardId: this is the default,
# but check it in case we change the default in future
assert env.attachment_service.inspect(TenantShardId(tenant_id, 0, 0)) is not None
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 0)) is not None
workload = Workload(env, tenant_id, timeline_id, branch_name="main")
workload.init()
@@ -110,15 +115,15 @@ def test_sharding_split_unsharded(
workload.validate()
# Split one shard into two
env.attachment_service.tenant_shard_split(tenant_id, shard_count=2)
env.storage_controller.tenant_shard_split(tenant_id, shard_count=2)
# Check we got the shard IDs we expected
assert env.attachment_service.inspect(TenantShardId(tenant_id, 0, 2)) is not None
assert env.attachment_service.inspect(TenantShardId(tenant_id, 1, 2)) is not None
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 2)) is not None
assert env.storage_controller.inspect(TenantShardId(tenant_id, 1, 2)) is not None
workload.validate()
env.attachment_service.consistency_check()
env.storage_controller.consistency_check()
def test_sharding_split_smoke(
@@ -161,7 +166,7 @@ def test_sharding_split_smoke(
workload.write_rows(256)
# Note which pageservers initially hold a shard after tenant creation
pre_split_pageserver_ids = [loc["node_id"] for loc in env.attachment_service.locate(tenant_id)]
pre_split_pageserver_ids = [loc["node_id"] for loc in env.storage_controller.locate(tenant_id)]
# For pageservers holding a shard, validate their ingest statistics
# reflect a proper splitting of the WAL.
@@ -213,9 +218,9 @@ def test_sharding_split_smoke(
# Before split, old shards exist
assert shards_on_disk(old_shard_ids)
env.attachment_service.tenant_shard_split(tenant_id, shard_count=split_shard_count)
env.storage_controller.tenant_shard_split(tenant_id, shard_count=split_shard_count)
post_split_pageserver_ids = [loc["node_id"] for loc in env.attachment_service.locate(tenant_id)]
post_split_pageserver_ids = [loc["node_id"] for loc in env.storage_controller.locate(tenant_id)]
# We should have split into 8 shards, on the same 4 pageservers we started on.
assert len(post_split_pageserver_ids) == split_shard_count
assert len(set(post_split_pageserver_ids)) == shard_count
@@ -261,7 +266,7 @@ def test_sharding_split_smoke(
# Check that we didn't do any spurious reconciliations.
# Total number of reconciles should have been one per original shard, plus
# one for each shard that was migrated.
reconcile_ok = env.attachment_service.get_metric_value(
reconcile_ok = env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
)
assert reconcile_ok == shard_count + split_shard_count // 2
@@ -269,19 +274,19 @@ def test_sharding_split_smoke(
# Check that no cancelled or errored reconciliations occurred: this test does no
# failure injection and should run clean.
assert (
env.attachment_service.get_metric_value(
env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "cancel"}
)
is None
)
assert (
env.attachment_service.get_metric_value(
env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "error"}
)
is None
)
env.attachment_service.consistency_check()
env.storage_controller.consistency_check()
# Validate pageserver state
shards_exist: list[TenantShardId] = []
@@ -310,6 +315,96 @@ def test_sharding_split_smoke(
workload.validate()
@pytest.mark.parametrize("initial_stripe_size", [None, 65536])
def test_sharding_split_stripe_size(
neon_env_builder: NeonEnvBuilder,
httpserver: HTTPServer,
httpserver_listen_address,
initial_stripe_size: int,
):
"""
Check that modifying stripe size inline with a shard split works as expected
"""
(host, port) = httpserver_listen_address
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify"
neon_env_builder.num_pageservers = 1
# Set up fake HTTP notify endpoint: we will use this to validate that we receive
# the correct stripe size after split.
notifications = []
def handler(request: Request):
log.info(f"Notify request: {request}")
notifications.append(request.json)
return Response(status=200)
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
env = neon_env_builder.init_start(
initial_tenant_shard_count=1, initial_tenant_shard_stripe_size=initial_stripe_size
)
tenant_id = env.initial_tenant
assert len(notifications) == 1
expect: Dict[str, Union[List[Dict[str, int]], str, None, int]] = {
"tenant_id": str(env.initial_tenant),
"stripe_size": None,
"shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}],
}
assert notifications[0] == expect
new_stripe_size = 2048
env.storage_controller.tenant_shard_split(
tenant_id, shard_count=2, shard_stripe_size=new_stripe_size
)
# Check that we ended up with the stripe size that we expected, both on the pageserver
# and in the notifications to compute
assert len(notifications) == 2
expect_after: Dict[str, Union[List[Dict[str, int]], str, None, int]] = {
"tenant_id": str(env.initial_tenant),
"stripe_size": new_stripe_size,
"shards": [
{"node_id": int(env.pageservers[0].id), "shard_number": 0},
{"node_id": int(env.pageservers[0].id), "shard_number": 1},
],
}
log.info(f"Got notification: {notifications[1]}")
assert notifications[1] == expect_after
# Inspect the stripe size on the pageserver
shard_0_loc = (
env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 0, 2))
)
assert shard_0_loc["shard_stripe_size"] == new_stripe_size
shard_1_loc = (
env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 1, 2))
)
assert shard_1_loc["shard_stripe_size"] == new_stripe_size
# Ensure stripe size survives a pageserver restart
env.pageservers[0].stop()
env.pageservers[0].start()
shard_0_loc = (
env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 0, 2))
)
assert shard_0_loc["shard_stripe_size"] == new_stripe_size
shard_1_loc = (
env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 1, 2))
)
assert shard_1_loc["shard_stripe_size"] == new_stripe_size
# Ensure stripe size survives a storage controller restart
env.storage_controller.stop()
env.storage_controller.start()
def assert_restart_notification():
assert len(notifications) == 3
assert notifications[2] == expect_after
wait_until(10, 1, assert_restart_notification)
@pytest.mark.skipif(
# The quantity of data isn't huge, but debug can be _very_ slow, and the things we're
# validating in this test don't benefit much from debug assertions.
@@ -360,7 +455,7 @@ def test_sharding_ingest(
huge_layer_count = 0
# Inspect the resulting layer map, count how many layers are undersized.
for shard in env.attachment_service.locate(tenant_id):
for shard in env.storage_controller.locate(tenant_id):
pageserver = env.get_pageserver(shard["node_id"])
shard_id = shard["shard_id"]
layer_map = pageserver.http_client().layer_map_info(shard_id, timeline_id)

View File

@@ -6,10 +6,10 @@ from typing import Any, Dict, List, Union
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
AttachmentServiceApiException,
NeonEnv,
NeonEnvBuilder,
PgBin,
StorageControllerApiException,
TokenScope,
)
from fixtures.pageserver.http import PageserverHttpClient
@@ -36,7 +36,7 @@ from werkzeug.wrappers.response import Response
def get_node_shard_counts(env: NeonEnv, tenant_ids):
counts: defaultdict[str, int] = defaultdict(int)
for tid in tenant_ids:
for shard in env.attachment_service.locate(tid):
for shard in env.storage_controller.locate(tid):
counts[shard["node_id"]] += 1
return counts
@@ -62,20 +62,20 @@ def test_sharding_service_smoke(
# Start services by hand so that we can skip a pageserver (this will start + register later)
env.broker.try_start()
env.attachment_service.start()
env.storage_controller.start()
env.pageservers[0].start()
env.pageservers[1].start()
for sk in env.safekeepers:
sk.start()
# The pageservers we started should have registered with the sharding service on startup
nodes = env.attachment_service.node_list()
nodes = env.storage_controller.node_list()
assert len(nodes) == 2
assert set(n["id"] for n in nodes) == {env.pageservers[0].id, env.pageservers[1].id}
# Starting an additional pageserver should register successfully
env.pageservers[2].start()
nodes = env.attachment_service.node_list()
nodes = env.storage_controller.node_list()
assert len(nodes) == 3
assert set(n["id"] for n in nodes) == {ps.id for ps in env.pageservers}
@@ -99,22 +99,22 @@ def test_sharding_service_smoke(
# Creating and deleting timelines should work, using identical API to pageserver
timeline_crud_tenant = next(iter(tenant_ids))
timeline_id = TimelineId.generate()
env.attachment_service.pageserver_api().timeline_create(
env.storage_controller.pageserver_api().timeline_create(
pg_version=PgVersion.NOT_SET, tenant_id=timeline_crud_tenant, new_timeline_id=timeline_id
)
timelines = env.attachment_service.pageserver_api().timeline_list(timeline_crud_tenant)
timelines = env.storage_controller.pageserver_api().timeline_list(timeline_crud_tenant)
assert len(timelines) == 2
assert timeline_id in set(TimelineId(t["timeline_id"]) for t in timelines)
# virtual_ps_http.timeline_delete(tenant_id=timeline_crud_tenant, timeline_id=timeline_id)
timeline_delete_wait_completed(
env.attachment_service.pageserver_api(), timeline_crud_tenant, timeline_id
env.storage_controller.pageserver_api(), timeline_crud_tenant, timeline_id
)
timelines = env.attachment_service.pageserver_api().timeline_list(timeline_crud_tenant)
timelines = env.storage_controller.pageserver_api().timeline_list(timeline_crud_tenant)
assert len(timelines) == 1
assert timeline_id not in set(TimelineId(t["timeline_id"]) for t in timelines)
# Marking a pageserver offline should migrate tenants away from it.
env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"})
env.storage_controller.node_configure(env.pageservers[0].id, {"availability": "Offline"})
def node_evacuated(node_id: int) -> None:
counts = get_node_shard_counts(env, tenant_ids)
@@ -124,7 +124,7 @@ def test_sharding_service_smoke(
# Marking pageserver active should not migrate anything to it
# immediately
env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Active"})
env.storage_controller.node_configure(env.pageservers[0].id, {"availability": "Active"})
time.sleep(1)
assert get_node_shard_counts(env, tenant_ids)[env.pageservers[0].id] == 0
@@ -144,13 +144,13 @@ def test_sharding_service_smoke(
# Delete all the tenants
for tid in tenant_ids:
tenant_delete_wait_completed(env.attachment_service.pageserver_api(), tid, 10)
tenant_delete_wait_completed(env.storage_controller.pageserver_api(), tid, 10)
env.attachment_service.consistency_check()
env.storage_controller.consistency_check()
# Set a scheduling policy on one node, create all the tenants, observe
# that the scheduling policy is respected.
env.attachment_service.node_configure(env.pageservers[1].id, {"scheduling": "Draining"})
env.storage_controller.node_configure(env.pageservers[1].id, {"scheduling": "Draining"})
# Create some fresh tenants
tenant_ids = set(TenantId.generate() for i in range(0, tenant_count))
@@ -163,7 +163,7 @@ def test_sharding_service_smoke(
assert counts[env.pageservers[0].id] == tenant_shard_count // 2
assert counts[env.pageservers[2].id] == tenant_shard_count // 2
env.attachment_service.consistency_check()
env.storage_controller.consistency_check()
def test_node_status_after_restart(
@@ -173,28 +173,28 @@ def test_node_status_after_restart(
env = neon_env_builder.init_start()
# Initially we have two online pageservers
nodes = env.attachment_service.node_list()
nodes = env.storage_controller.node_list()
assert len(nodes) == 2
env.pageservers[1].stop()
env.attachment_service.stop()
env.attachment_service.start()
env.storage_controller.stop()
env.storage_controller.start()
def is_ready():
assert env.attachment_service.ready() is True
assert env.storage_controller.ready() is True
wait_until(30, 1, is_ready)
# We loaded nodes from database on restart
nodes = env.attachment_service.node_list()
nodes = env.storage_controller.node_list()
assert len(nodes) == 2
# We should still be able to create a tenant, because the pageserver which is still online
# should have had its availabilty state set to Active.
env.attachment_service.tenant_create(TenantId.generate())
env.storage_controller.tenant_create(TenantId.generate())
env.attachment_service.consistency_check()
env.storage_controller.consistency_check()
def test_sharding_service_passthrough(
@@ -208,9 +208,9 @@ def test_sharding_service_passthrough(
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_start()
# We will talk to attachment service as if it was a pageserver, using the pageserver
# We will talk to storage controller as if it was a pageserver, using the pageserver
# HTTP client
client = PageserverHttpClient(env.attachment_service_port, lambda: True)
client = PageserverHttpClient(env.storage_controller_port, lambda: True)
timelines = client.timeline_list(tenant_id=env.initial_tenant)
assert len(timelines) == 1
@@ -221,22 +221,22 @@ def test_sharding_service_passthrough(
}
assert status["state"]["slug"] == "Active"
env.attachment_service.consistency_check()
env.storage_controller.consistency_check()
def test_sharding_service_restart(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
tenant_a = env.initial_tenant
tenant_b = TenantId.generate()
env.attachment_service.tenant_create(tenant_b)
env.storage_controller.tenant_create(tenant_b)
env.pageserver.tenant_detach(tenant_a)
# TODO: extend this test to use multiple pageservers, and check that locations don't move around
# on restart.
# Attachment service restart
env.attachment_service.stop()
env.attachment_service.start()
# Storage controller restart
env.storage_controller.stop()
env.storage_controller.start()
observed = set(TenantId(tenant["id"]) for tenant in env.pageserver.http_client().tenant_list())
@@ -255,7 +255,7 @@ def test_sharding_service_restart(neon_env_builder: NeonEnvBuilder):
assert tenant_a not in observed
assert tenant_b in observed
env.attachment_service.consistency_check()
env.storage_controller.consistency_check()
@pytest.mark.parametrize("warm_up", [True, False])
@@ -271,27 +271,26 @@ def test_sharding_service_onboarding(neon_env_builder: NeonEnvBuilder, warm_up:
# Start services by hand so that we can skip registration on one of the pageservers
env = neon_env_builder.init_configs()
env.broker.try_start()
env.attachment_service.start()
env.storage_controller.start()
# This is the pageserver where we'll initially create the tenant. Run it in emergency
# mode so that it doesn't talk to storage controller, and do not register it.
env.pageservers[0].allowed_errors.append(".*Emergency mode!.*")
env.pageservers[0].start(
overrides=("--pageserver-config-override=control_plane_emergency_mode=true",),
register=False,
)
origin_ps = env.pageservers[0]
# This is the pageserver managed by the sharding service, where the tenant
# will be attached after onboarding
env.pageservers[1].start(register=True)
env.pageservers[1].start()
dest_ps = env.pageservers[1]
virtual_ps_http = PageserverHttpClient(env.attachment_service_port, lambda: True)
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
for sk in env.safekeepers:
sk.start()
# Create a tenant directly via pageserver HTTP API, skipping the attachment service
# Create a tenant directly via pageserver HTTP API, skipping the storage controller
tenant_id = TenantId.generate()
generation = 123
origin_ps.http_client().tenant_create(tenant_id, generation=generation)
@@ -324,7 +323,7 @@ def test_sharding_service_onboarding(neon_env_builder: NeonEnvBuilder, warm_up:
virtual_ps_http.tenant_secondary_download(tenant_id)
# Call into attachment service to onboard the tenant
# Call into storage controller to onboard the tenant
generation += 1
virtual_ps_http.tenant_location_conf(
tenant_id,
@@ -347,7 +346,7 @@ def test_sharding_service_onboarding(neon_env_builder: NeonEnvBuilder, warm_up:
},
)
# As if doing a live migration, call into the attachment service to
# As if doing a live migration, call into the storage controller to
# set it to AttachedSingle: this is a no-op, but we test it because the
# cloud control plane may call this for symmetry with live migration to
# an individual pageserver
@@ -375,8 +374,8 @@ def test_sharding_service_onboarding(neon_env_builder: NeonEnvBuilder, warm_up:
assert dest_tenants[0]["generation"] == generation + 1
# The onboarded tenant should survive a restart of sharding service
env.attachment_service.stop()
env.attachment_service.start()
env.storage_controller.stop()
env.storage_controller.start()
# The onboarded tenant should surviev a restart of pageserver
dest_ps.stop()
@@ -407,7 +406,7 @@ def test_sharding_service_onboarding(neon_env_builder: NeonEnvBuilder, warm_up:
dest_tenant_conf_after = dest_ps.http_client().tenant_config(tenant_id)
assert dest_tenant_conf_after.tenant_specific_overrides == modified_tenant_conf
env.attachment_service.consistency_check()
env.storage_controller.consistency_check()
def test_sharding_service_compute_hook(
@@ -419,7 +418,7 @@ def test_sharding_service_compute_hook(
Test that the sharding service calls out to the configured HTTP endpoint on attachment changes
"""
# We will run two pageserver to migrate and check that the attachment service sends notifications
# We will run two pageserver to migrate and check that the storage controller sends notifications
# when migrating.
neon_env_builder.num_pageservers = 2
(host, port) = httpserver_listen_address
@@ -450,7 +449,7 @@ def test_sharding_service_compute_hook(
}
assert notifications[0] == expect
env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"})
env.storage_controller.node_configure(env.pageservers[0].id, {"availability": "Offline"})
def node_evacuated(node_id: int) -> None:
counts = get_node_shard_counts(env, [env.initial_tenant])
@@ -473,8 +472,8 @@ def test_sharding_service_compute_hook(
wait_until(20, 0.25, received_migration_notification)
# When we restart, we should re-emit notifications for all tenants
env.attachment_service.stop()
env.attachment_service.start()
env.storage_controller.stop()
env.storage_controller.start()
def received_restart_notification():
assert len(notifications) == 3
@@ -483,7 +482,7 @@ def test_sharding_service_compute_hook(
wait_until(10, 1, received_restart_notification)
# Splitting a tenant should cause its stripe size to become visible in the compute notification
env.attachment_service.tenant_shard_split(env.initial_tenant, shard_count=2)
env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=2)
expect = {
"tenant_id": str(env.initial_tenant),
"stripe_size": 32768,
@@ -499,7 +498,7 @@ def test_sharding_service_compute_hook(
wait_until(10, 1, received_split_notification)
env.attachment_service.consistency_check()
env.storage_controller.consistency_check()
def test_sharding_service_debug_apis(neon_env_builder: NeonEnvBuilder):
@@ -512,55 +511,55 @@ def test_sharding_service_debug_apis(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
tenant_id = TenantId.generate()
env.attachment_service.tenant_create(tenant_id, shard_count=2, shard_stripe_size=8192)
env.storage_controller.tenant_create(tenant_id, shard_count=2, shard_stripe_size=8192)
# Check that the consistency check passes on a freshly setup system
env.attachment_service.consistency_check()
env.storage_controller.consistency_check()
# These APIs are intentionally not implemented as methods on NeonAttachmentService, as
# These APIs are intentionally not implemented as methods on NeonStorageController, as
# they're just for use in unanticipated circumstances.
# Initial tenant (1 shard) and the one we just created (2 shards) should be visible
response = env.attachment_service.request(
response = env.storage_controller.request(
"GET",
f"{env.attachment_service_api}/debug/v1/tenant",
headers=env.attachment_service.headers(TokenScope.ADMIN),
f"{env.storage_controller_api}/debug/v1/tenant",
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
assert len(response.json()) == 3
# Scheduler should report the expected nodes and shard counts
response = env.attachment_service.request(
"GET", f"{env.attachment_service_api}/debug/v1/scheduler"
response = env.storage_controller.request(
"GET", f"{env.storage_controller_api}/debug/v1/scheduler"
)
# Two nodes, in a dict of node_id->node
assert len(response.json()["nodes"]) == 2
assert sum(v["shard_count"] for v in response.json()["nodes"].values()) == 3
assert all(v["may_schedule"] for v in response.json()["nodes"].values())
response = env.attachment_service.request(
response = env.storage_controller.request(
"POST",
f"{env.attachment_service_api}/debug/v1/node/{env.pageservers[1].id}/drop",
headers=env.attachment_service.headers(TokenScope.ADMIN),
f"{env.storage_controller_api}/debug/v1/node/{env.pageservers[1].id}/drop",
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
assert len(env.attachment_service.node_list()) == 1
assert len(env.storage_controller.node_list()) == 1
response = env.attachment_service.request(
response = env.storage_controller.request(
"POST",
f"{env.attachment_service_api}/debug/v1/tenant/{tenant_id}/drop",
headers=env.attachment_service.headers(TokenScope.ADMIN),
f"{env.storage_controller_api}/debug/v1/tenant/{tenant_id}/drop",
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
# Tenant drop should be reflected in dump output
response = env.attachment_service.request(
response = env.storage_controller.request(
"GET",
f"{env.attachment_service_api}/debug/v1/tenant",
headers=env.attachment_service.headers(TokenScope.ADMIN),
f"{env.storage_controller_api}/debug/v1/tenant",
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
assert len(response.json()) == 1
# Check that the 'drop' APIs didn't leave things in a state that would fail a consistency check: they're
# meant to be unclean wrt the pageserver state, but not leave a broken storage controller behind.
env.attachment_service.consistency_check()
env.storage_controller.consistency_check()
def test_sharding_service_s3_time_travel_recovery(
@@ -584,10 +583,10 @@ def test_sharding_service_s3_time_travel_recovery(
neon_env_builder.num_pageservers = 1
env = neon_env_builder.init_start()
virtual_ps_http = PageserverHttpClient(env.attachment_service_port, lambda: True)
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
tenant_id = TenantId.generate()
env.attachment_service.tenant_create(
env.storage_controller.tenant_create(
tenant_id,
shard_count=2,
shard_stripe_size=8192,
@@ -595,7 +594,7 @@ def test_sharding_service_s3_time_travel_recovery(
)
# Check that the consistency check passes
env.attachment_service.consistency_check()
env.storage_controller.consistency_check()
branch_name = "main"
timeline_id = env.neon_cli.create_timeline(
@@ -670,28 +669,28 @@ def test_sharding_service_s3_time_travel_recovery(
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
endpoint.safe_psql("SELECT * FROM created_foo;")
env.attachment_service.consistency_check()
env.storage_controller.consistency_check()
def test_sharding_service_auth(neon_env_builder: NeonEnvBuilder):
neon_env_builder.auth_enabled = True
env = neon_env_builder.init_start()
svc = env.attachment_service
api = env.attachment_service_api
svc = env.storage_controller
api = env.storage_controller_api
tenant_id = TenantId.generate()
body: Dict[str, Any] = {"new_tenant_id": str(tenant_id)}
# No token
with pytest.raises(
AttachmentServiceApiException,
StorageControllerApiException,
match="Unauthorized: missing authorization header",
):
svc.request("POST", f"{env.attachment_service_api}/v1/tenant", json=body)
svc.request("POST", f"{env.storage_controller_api}/v1/tenant", json=body)
# Token with incorrect scope
with pytest.raises(
AttachmentServiceApiException,
StorageControllerApiException,
match="Forbidden: JWT authentication error",
):
svc.request("POST", f"{api}/v1/tenant", json=body, headers=svc.headers(TokenScope.ADMIN))
@@ -703,14 +702,14 @@ def test_sharding_service_auth(neon_env_builder: NeonEnvBuilder):
# No token
with pytest.raises(
AttachmentServiceApiException,
StorageControllerApiException,
match="Unauthorized: missing authorization header",
):
svc.request("GET", f"{api}/debug/v1/tenant")
# Token with incorrect scope
with pytest.raises(
AttachmentServiceApiException,
StorageControllerApiException,
match="Forbidden: JWT authentication error",
):
svc.request(
@@ -719,14 +718,14 @@ def test_sharding_service_auth(neon_env_builder: NeonEnvBuilder):
# No token
with pytest.raises(
AttachmentServiceApiException,
StorageControllerApiException,
match="Unauthorized: missing authorization header",
):
svc.request("POST", f"{api}/upcall/v1/re-attach")
# Token with incorrect scope
with pytest.raises(
AttachmentServiceApiException,
StorageControllerApiException,
match="Forbidden: JWT authentication error",
):
svc.request(
@@ -743,7 +742,7 @@ def test_sharding_service_tenant_conf(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
http = env.attachment_service.pageserver_api()
http = env.storage_controller.pageserver_api()
default_value = "7days"
new_value = "1h"
@@ -769,4 +768,4 @@ def test_sharding_service_tenant_conf(neon_env_builder: NeonEnvBuilder):
assert readback_ps.effective_config["pitr_interval"] == default_value
assert "pitr_interval" not in readback_ps.tenant_specific_overrides
env.attachment_service.consistency_check()
env.storage_controller.consistency_check()

View File

@@ -1011,7 +1011,7 @@ def test_eager_attach_does_not_queue_up(neon_env_builder: NeonEnvBuilder):
resp = client.tenant_status(eager_tenant)
assert resp["state"]["slug"] == "Active"
gen = env.attachment_service.attach_hook_issue(eager_tenant, env.pageserver.id)
gen = env.storage_controller.attach_hook_issue(eager_tenant, env.pageserver.id)
client.tenant_location_conf(
eager_tenant,
{
@@ -1071,7 +1071,7 @@ def test_lazy_attach_activation(neon_env_builder: NeonEnvBuilder, activation_met
# attach, it will consume the only permit because logical size calculation
# is paused.
gen = env.attachment_service.attach_hook_issue(lazy_tenant, env.pageserver.id)
gen = env.storage_controller.attach_hook_issue(lazy_tenant, env.pageserver.id)
client.tenant_location_conf(
lazy_tenant,
{