mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-31 01:00:36 +00:00
Compare commits
9 Commits
jcsp/paths
...
lr-tests-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5598f712fa | ||
|
|
83855a907c | ||
|
|
1b41db8bdd | ||
|
|
bac06ea1ac | ||
|
|
7ae8364b0b | ||
|
|
1f7d54f987 | ||
|
|
580e136b2e | ||
|
|
09699d4bd8 | ||
|
|
89cf714890 |
2
Makefile
2
Makefile
@@ -51,7 +51,7 @@ CARGO_BUILD_FLAGS += $(filter -j1,$(MAKEFLAGS))
|
||||
CARGO_CMD_PREFIX += $(if $(filter n,$(MAKEFLAGS)),,+)
|
||||
# Force cargo not to print progress bar
|
||||
CARGO_CMD_PREFIX += CARGO_TERM_PROGRESS_WHEN=never CI=1
|
||||
# Set PQ_LIB_DIR to make sure `attachment_service` get linked with bundled libpq (through diesel)
|
||||
# Set PQ_LIB_DIR to make sure `storage_controller` get linked with bundled libpq (through diesel)
|
||||
CARGO_CMD_PREFIX += PQ_LIB_DIR=$(POSTGRES_INSTALL_DIR)/v16/lib
|
||||
|
||||
#
|
||||
|
||||
@@ -30,7 +30,7 @@ use pageserver_api::controller_api::{
|
||||
};
|
||||
use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
|
||||
|
||||
use control_plane::attachment_service::{AttachHookRequest, InspectRequest};
|
||||
use control_plane::storage_controller::{AttachHookRequest, InspectRequest};
|
||||
|
||||
/// State available to HTTP request handlers
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -1,9 +1,3 @@
|
||||
/// The attachment service mimics the aspects of the control plane API
|
||||
/// that are required for a pageserver to operate.
|
||||
///
|
||||
/// This enables running & testing pageservers without a full-blown
|
||||
/// deployment of the Neon cloud platform.
|
||||
///
|
||||
use anyhow::{anyhow, Context};
|
||||
use attachment_service::http::make_router;
|
||||
use attachment_service::metrics::preinitialize_metrics;
|
||||
|
||||
@@ -20,7 +20,7 @@ use crate::node::Node;
|
||||
|
||||
/// ## What do we store?
|
||||
///
|
||||
/// The attachment service does not store most of its state durably.
|
||||
/// The storage controller service does not store most of its state durably.
|
||||
///
|
||||
/// The essential things to store durably are:
|
||||
/// - generation numbers, as these must always advance monotonically to ensure data safety.
|
||||
@@ -34,7 +34,7 @@ use crate::node::Node;
|
||||
///
|
||||
/// ## Performance/efficiency
|
||||
///
|
||||
/// The attachment service does not go via the database for most things: there are
|
||||
/// The storage controller service does not go via the database for most things: there are
|
||||
/// a couple of places where we must, and where efficiency matters:
|
||||
/// - Incrementing generation numbers: the Reconciler has to wait for this to complete
|
||||
/// before it can attach a tenant, so this acts as a bound on how fast things like
|
||||
|
||||
@@ -8,7 +8,7 @@ use std::{
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use control_plane::attachment_service::{
|
||||
use control_plane::storage_controller::{
|
||||
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
|
||||
};
|
||||
use diesel::result::DatabaseErrorKind;
|
||||
@@ -839,7 +839,7 @@ impl Service {
|
||||
tenant_state.generation = Some(new_generation);
|
||||
} else {
|
||||
// This is a detach notification. We must update placement policy to avoid re-attaching
|
||||
// during background scheduling/reconciliation, or during attachment service restart.
|
||||
// during background scheduling/reconciliation, or during storage controller restart.
|
||||
assert!(attach_req.node_id.is_none());
|
||||
tenant_state.policy = PlacementPolicy::Detached;
|
||||
}
|
||||
@@ -922,6 +922,10 @@ impl Service {
|
||||
&self,
|
||||
reattach_req: ReAttachRequest,
|
||||
) -> Result<ReAttachResponse, ApiError> {
|
||||
if let Some(register_req) = reattach_req.register {
|
||||
self.node_register(register_req).await?;
|
||||
}
|
||||
|
||||
// Take a re-attach as indication that the node is available: this is a precursor to proper
|
||||
// heartbeating in https://github.com/neondatabase/neon/issues/6844
|
||||
self.node_configure(NodeConfigureRequest {
|
||||
@@ -2218,7 +2222,18 @@ impl Service {
|
||||
|
||||
// unwrap safety: we would have returned above if we didn't find at least one shard to split
|
||||
let old_shard_count = old_shard_count.unwrap();
|
||||
let shard_ident = shard_ident.unwrap();
|
||||
let shard_ident = if let Some(new_stripe_size) = split_req.new_stripe_size {
|
||||
// This ShardIdentity will be used as the template for all children, so this implicitly
|
||||
// applies the new stripe size to the children.
|
||||
let mut shard_ident = shard_ident.unwrap();
|
||||
if shard_ident.count.count() > 1 && shard_ident.stripe_size != new_stripe_size {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!("Attempted to change stripe size ({:?}->{new_stripe_size:?}) on a tenant with multiple shards", shard_ident.stripe_size)));
|
||||
}
|
||||
shard_ident.stripe_size = new_stripe_size;
|
||||
shard_ident
|
||||
} else {
|
||||
shard_ident.unwrap()
|
||||
};
|
||||
let policy = policy.unwrap();
|
||||
|
||||
// FIXME: we have dropped self.inner lock, and not yet written anything to the database: another
|
||||
@@ -2310,6 +2325,7 @@ impl Service {
|
||||
*parent_id,
|
||||
TenantShardSplitRequest {
|
||||
new_shard_count: split_req.new_shard_count,
|
||||
new_stripe_size: split_req.new_stripe_size,
|
||||
},
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -8,11 +8,11 @@
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command, ValueEnum};
|
||||
use compute_api::spec::ComputeMode;
|
||||
use control_plane::attachment_service::AttachmentService;
|
||||
use control_plane::endpoint::ComputeControlPlane;
|
||||
use control_plane::local_env::{InitForceMode, LocalEnv};
|
||||
use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR};
|
||||
use control_plane::safekeeper::SafekeeperNode;
|
||||
use control_plane::storage_controller::StorageController;
|
||||
use control_plane::{broker, local_env};
|
||||
use pageserver_api::controller_api::{
|
||||
NodeAvailability, NodeConfigureRequest, NodeSchedulingPolicy, PlacementPolicy,
|
||||
@@ -138,7 +138,7 @@ fn main() -> Result<()> {
|
||||
"start" => rt.block_on(handle_start_all(sub_args, &env)),
|
||||
"stop" => rt.block_on(handle_stop_all(sub_args, &env)),
|
||||
"pageserver" => rt.block_on(handle_pageserver(sub_args, &env)),
|
||||
"attachment_service" => rt.block_on(handle_attachment_service(sub_args, &env)),
|
||||
"storage_controller" => rt.block_on(handle_storage_controller(sub_args, &env)),
|
||||
"safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)),
|
||||
"endpoint" => rt.block_on(handle_endpoint(sub_args, &env)),
|
||||
"mappings" => handle_mappings(sub_args, &mut env),
|
||||
@@ -445,14 +445,14 @@ async fn handle_tenant(
|
||||
// If tenant ID was not specified, generate one
|
||||
let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate);
|
||||
|
||||
// We must register the tenant with the attachment service, so
|
||||
// We must register the tenant with the storage controller, so
|
||||
// that when the pageserver restarts, it will be re-attached.
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
attachment_service
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
storage_controller
|
||||
.tenant_create(TenantCreateRequest {
|
||||
// Note that ::unsharded here isn't actually because the tenant is unsharded, its because the
|
||||
// attachment service expecfs a shard-naive tenant_id in this attribute, and the TenantCreateRequest
|
||||
// type is used both in attachment service (for creating tenants) and in pageserver (for creating shards)
|
||||
// storage controller expecfs a shard-naive tenant_id in this attribute, and the TenantCreateRequest
|
||||
// type is used both in storage controller (for creating tenants) and in pageserver (for creating shards)
|
||||
new_tenant_id: TenantShardId::unsharded(tenant_id),
|
||||
generation: None,
|
||||
shard_parameters: ShardParameters {
|
||||
@@ -476,9 +476,9 @@ async fn handle_tenant(
|
||||
.context("Failed to parse postgres version from the argument string")?;
|
||||
|
||||
// FIXME: passing None for ancestor_start_lsn is not kosher in a sharded world: we can't have
|
||||
// different shards picking different start lsns. Maybe we have to teach attachment service
|
||||
// different shards picking different start lsns. Maybe we have to teach storage controller
|
||||
// to let shard 0 branch first and then propagate the chosen LSN to other shards.
|
||||
attachment_service
|
||||
storage_controller
|
||||
.tenant_timeline_create(
|
||||
tenant_id,
|
||||
TimelineCreateRequest {
|
||||
@@ -528,8 +528,8 @@ async fn handle_tenant(
|
||||
let new_pageserver = get_pageserver(env, matches)?;
|
||||
let new_pageserver_id = new_pageserver.conf.id;
|
||||
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
attachment_service
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
storage_controller
|
||||
.tenant_migrate(tenant_shard_id, new_pageserver_id)
|
||||
.await?;
|
||||
|
||||
@@ -543,8 +543,8 @@ async fn handle_tenant(
|
||||
|
||||
let mut tenant_synthetic_size = None;
|
||||
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
for shard in attachment_service.tenant_locate(tenant_id).await?.shards {
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
for shard in storage_controller.tenant_locate(tenant_id).await?.shards {
|
||||
let pageserver =
|
||||
PageServerNode::from_env(env, env.get_pageserver_conf(shard.node_id)?);
|
||||
|
||||
@@ -585,10 +585,14 @@ async fn handle_tenant(
|
||||
Some(("shard-split", matches)) => {
|
||||
let tenant_id = get_tenant_id(matches, env)?;
|
||||
let shard_count: u8 = matches.get_one::<u8>("shard-count").cloned().unwrap_or(0);
|
||||
let shard_stripe_size: Option<ShardStripeSize> = matches
|
||||
.get_one::<Option<ShardStripeSize>>("shard-stripe-size")
|
||||
.cloned()
|
||||
.unwrap();
|
||||
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
let result = attachment_service
|
||||
.tenant_split(tenant_id, shard_count)
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
let result = storage_controller
|
||||
.tenant_split(tenant_id, shard_count, shard_stripe_size)
|
||||
.await?;
|
||||
println!(
|
||||
"Split tenant {} into shards {}",
|
||||
@@ -613,7 +617,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
|
||||
match timeline_match.subcommand() {
|
||||
Some(("list", list_match)) => {
|
||||
// TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the attachment service
|
||||
// TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
|
||||
// where shard 0 is attached, and query there.
|
||||
let tenant_shard_id = get_tenant_shard_id(list_match, env)?;
|
||||
let timelines = pageserver.timeline_list(&tenant_shard_id).await?;
|
||||
@@ -633,7 +637,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
let new_timeline_id_opt = parse_timeline_id(create_match)?;
|
||||
let new_timeline_id = new_timeline_id_opt.unwrap_or(TimelineId::generate());
|
||||
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
let create_req = TimelineCreateRequest {
|
||||
new_timeline_id,
|
||||
ancestor_timeline_id: None,
|
||||
@@ -641,7 +645,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
ancestor_start_lsn: None,
|
||||
pg_version: Some(pg_version),
|
||||
};
|
||||
let timeline_info = attachment_service
|
||||
let timeline_info = storage_controller
|
||||
.tenant_timeline_create(tenant_id, create_req)
|
||||
.await?;
|
||||
|
||||
@@ -730,7 +734,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
.transpose()
|
||||
.context("Failed to parse ancestor start Lsn from the request")?;
|
||||
let new_timeline_id = TimelineId::generate();
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
let create_req = TimelineCreateRequest {
|
||||
new_timeline_id,
|
||||
ancestor_timeline_id: Some(ancestor_timeline_id),
|
||||
@@ -738,7 +742,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
ancestor_start_lsn: start_lsn,
|
||||
pg_version: None,
|
||||
};
|
||||
let timeline_info = attachment_service
|
||||
let timeline_info = storage_controller
|
||||
.tenant_timeline_create(tenant_id, create_req)
|
||||
.await?;
|
||||
|
||||
@@ -767,7 +771,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
|
||||
match sub_name {
|
||||
"list" => {
|
||||
// TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the attachment service
|
||||
// TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the storage controller
|
||||
// where shard 0 is attached, and query there.
|
||||
let tenant_shard_id = get_tenant_shard_id(sub_args, env)?;
|
||||
let timeline_infos = get_timeline_infos(env, &tenant_shard_id)
|
||||
@@ -952,21 +956,21 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
(
|
||||
vec![(parsed.0, parsed.1.unwrap_or(5432))],
|
||||
// If caller is telling us what pageserver to use, this is not a tenant which is
|
||||
// full managed by attachment service, therefore not sharded.
|
||||
// full managed by storage controller, therefore not sharded.
|
||||
ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
)
|
||||
} else {
|
||||
// Look up the currently attached location of the tenant, and its striping metadata,
|
||||
// to pass these on to postgres.
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
let locate_result = attachment_service.tenant_locate(endpoint.tenant_id).await?;
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
|
||||
let pageservers = locate_result
|
||||
.shards
|
||||
.into_iter()
|
||||
.map(|shard| {
|
||||
(
|
||||
Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Attachment service reported bad hostname"),
|
||||
.expect("Storage controller reported bad hostname"),
|
||||
shard.listen_pg_port,
|
||||
)
|
||||
})
|
||||
@@ -1015,8 +1019,8 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
pageserver.pg_connection_config.port(),
|
||||
)]
|
||||
} else {
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
attachment_service
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
storage_controller
|
||||
.tenant_locate(endpoint.tenant_id)
|
||||
.await?
|
||||
.shards
|
||||
@@ -1024,7 +1028,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
.map(|shard| {
|
||||
(
|
||||
Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Attachment service reported malformed host"),
|
||||
.expect("Storage controller reported malformed host"),
|
||||
shard.listen_pg_port,
|
||||
)
|
||||
})
|
||||
@@ -1100,9 +1104,8 @@ fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageSe
|
||||
async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
match sub_match.subcommand() {
|
||||
Some(("start", subcommand_args)) => {
|
||||
let register = subcommand_args.get_one::<bool>("register").unwrap_or(&true);
|
||||
if let Err(e) = get_pageserver(env, subcommand_args)?
|
||||
.start(&pageserver_config_overrides(subcommand_args), *register)
|
||||
.start(&pageserver_config_overrides(subcommand_args))
|
||||
.await
|
||||
{
|
||||
eprintln!("pageserver start failed: {e}");
|
||||
@@ -1131,7 +1134,7 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
|
||||
}
|
||||
|
||||
if let Err(e) = pageserver
|
||||
.start(&pageserver_config_overrides(subcommand_args), false)
|
||||
.start(&pageserver_config_overrides(subcommand_args))
|
||||
.await
|
||||
{
|
||||
eprintln!("pageserver start failed: {e}");
|
||||
@@ -1144,8 +1147,8 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
|
||||
let scheduling = subcommand_args.get_one("scheduling");
|
||||
let availability = subcommand_args.get_one("availability");
|
||||
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
attachment_service
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
storage_controller
|
||||
.node_configure(NodeConfigureRequest {
|
||||
node_id: pageserver.conf.id,
|
||||
scheduling: scheduling.cloned(),
|
||||
@@ -1170,11 +1173,11 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_attachment_service(
|
||||
async fn handle_storage_controller(
|
||||
sub_match: &ArgMatches,
|
||||
env: &local_env::LocalEnv,
|
||||
) -> Result<()> {
|
||||
let svc = AttachmentService::from_env(env);
|
||||
let svc = StorageController::from_env(env);
|
||||
match sub_match.subcommand() {
|
||||
Some(("start", _start_match)) => {
|
||||
if let Err(e) = svc.start().await {
|
||||
@@ -1194,8 +1197,8 @@ async fn handle_attachment_service(
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
Some((sub_name, _)) => bail!("Unexpected attachment_service subcommand '{}'", sub_name),
|
||||
None => bail!("no attachment_service subcommand provided"),
|
||||
Some((sub_name, _)) => bail!("Unexpected storage_controller subcommand '{}'", sub_name),
|
||||
None => bail!("no storage_controller subcommand provided"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -1280,11 +1283,11 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
|
||||
|
||||
broker::start_broker_process(env).await?;
|
||||
|
||||
// Only start the attachment service if the pageserver is configured to need it
|
||||
// Only start the storage controller if the pageserver is configured to need it
|
||||
if env.control_plane_api.is_some() {
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
if let Err(e) = attachment_service.start().await {
|
||||
eprintln!("attachment_service start failed: {:#}", e);
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
if let Err(e) = storage_controller.start().await {
|
||||
eprintln!("storage_controller start failed: {:#}", e);
|
||||
try_stop_all(env, true).await;
|
||||
exit(1);
|
||||
}
|
||||
@@ -1293,7 +1296,7 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
|
||||
for ps_conf in &env.pageservers {
|
||||
let pageserver = PageServerNode::from_env(env, ps_conf);
|
||||
if let Err(e) = pageserver
|
||||
.start(&pageserver_config_overrides(sub_match), true)
|
||||
.start(&pageserver_config_overrides(sub_match))
|
||||
.await
|
||||
{
|
||||
eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e);
|
||||
@@ -1356,9 +1359,9 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
|
||||
}
|
||||
|
||||
if env.control_plane_api.is_some() {
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
if let Err(e) = attachment_service.stop(immediate).await {
|
||||
eprintln!("attachment service stop failed: {e:#}");
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
if let Err(e) = storage_controller.stop(immediate).await {
|
||||
eprintln!("storage controller stop failed: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1586,6 +1589,7 @@ fn cli() -> Command {
|
||||
.about("Increase the number of shards in the tenant")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
|
||||
.arg(Arg::new("shard-stripe-size").value_parser(value_parser!(u32)).long("shard-stripe-size").action(ArgAction::Set).help("Sharding stripe size in pages"))
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
@@ -1596,11 +1600,7 @@ fn cli() -> Command {
|
||||
.subcommand(Command::new("status"))
|
||||
.subcommand(Command::new("start")
|
||||
.about("Start local pageserver")
|
||||
.arg(pageserver_config_args.clone()).arg(Arg::new("register")
|
||||
.long("register")
|
||||
.default_value("true").required(false)
|
||||
.value_parser(value_parser!(bool))
|
||||
.value_name("register"))
|
||||
.arg(pageserver_config_args.clone())
|
||||
)
|
||||
.subcommand(Command::new("stop")
|
||||
.about("Stop local pageserver")
|
||||
@@ -1618,9 +1618,9 @@ fn cli() -> Command {
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("attachment_service")
|
||||
Command::new("storage_controller")
|
||||
.arg_required_else_help(true)
|
||||
.about("Manage attachment_service")
|
||||
.about("Manage storage_controller")
|
||||
.subcommand(Command::new("start").about("Start local pageserver").arg(pageserver_config_args.clone()))
|
||||
.subcommand(Command::new("stop").about("Stop local pageserver")
|
||||
.arg(stop_mode_arg.clone()))
|
||||
|
||||
@@ -57,9 +57,9 @@ use serde::{Deserialize, Serialize};
|
||||
use url::Host;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
use crate::attachment_service::AttachmentService;
|
||||
use crate::local_env::LocalEnv;
|
||||
use crate::postgresql_conf::PostgresConf;
|
||||
use crate::storage_controller::StorageController;
|
||||
|
||||
use compute_api::responses::{ComputeState, ComputeStatus};
|
||||
use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec};
|
||||
@@ -750,17 +750,17 @@ impl Endpoint {
|
||||
let postgresql_conf = self.read_postgresql_conf()?;
|
||||
spec.cluster.postgresql_conf = Some(postgresql_conf);
|
||||
|
||||
// If we weren't given explicit pageservers, query the attachment service
|
||||
// If we weren't given explicit pageservers, query the storage controller
|
||||
if pageservers.is_empty() {
|
||||
let attachment_service = AttachmentService::from_env(&self.env);
|
||||
let locate_result = attachment_service.tenant_locate(self.tenant_id).await?;
|
||||
let storage_controller = StorageController::from_env(&self.env);
|
||||
let locate_result = storage_controller.tenant_locate(self.tenant_id).await?;
|
||||
pageservers = locate_result
|
||||
.shards
|
||||
.into_iter()
|
||||
.map(|shard| {
|
||||
(
|
||||
Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Attachment service reported bad hostname"),
|
||||
.expect("Storage controller reported bad hostname"),
|
||||
shard.listen_pg_port,
|
||||
)
|
||||
})
|
||||
@@ -774,7 +774,10 @@ impl Endpoint {
|
||||
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
|
||||
}
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(30))
|
||||
.build()
|
||||
.unwrap();
|
||||
let response = client
|
||||
.post(format!(
|
||||
"http://{}:{}/configure",
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
//! local installations.
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
|
||||
pub mod attachment_service;
|
||||
mod background_process;
|
||||
pub mod broker;
|
||||
pub mod endpoint;
|
||||
@@ -14,3 +13,4 @@ pub mod local_env;
|
||||
pub mod pageserver;
|
||||
pub mod postgresql_conf;
|
||||
pub mod safekeeper;
|
||||
pub mod storage_controller;
|
||||
|
||||
@@ -72,13 +72,13 @@ pub struct LocalEnv {
|
||||
#[serde(default)]
|
||||
pub safekeepers: Vec<SafekeeperConf>,
|
||||
|
||||
// Control plane upcall API for pageserver: if None, we will not run attachment_service. If set, this will
|
||||
// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
|
||||
// be propagated into each pageserver's configuration.
|
||||
#[serde(default)]
|
||||
pub control_plane_api: Option<Url>,
|
||||
|
||||
// Control plane upcall API for attachment service. If set, this will be propagated into the
|
||||
// attachment service's configuration.
|
||||
// Control plane upcall API for storage controller. If set, this will be propagated into the
|
||||
// storage controller's configuration.
|
||||
#[serde(default)]
|
||||
pub control_plane_compute_hook_api: Option<Url>,
|
||||
|
||||
@@ -227,10 +227,10 @@ impl LocalEnv {
|
||||
self.neon_distrib_dir.join("pageserver")
|
||||
}
|
||||
|
||||
pub fn attachment_service_bin(&self) -> PathBuf {
|
||||
// Irrespective of configuration, attachment service binary is always
|
||||
pub fn storage_controller_bin(&self) -> PathBuf {
|
||||
// Irrespective of configuration, storage controller binary is always
|
||||
// run from the same location as neon_local. This means that for compatibility
|
||||
// tests that run old pageserver/safekeeper, they still run latest attachment service.
|
||||
// tests that run old pageserver/safekeeper, they still run latest storage controller.
|
||||
let neon_local_bin_dir = env::current_exe().unwrap().parent().unwrap().to_owned();
|
||||
neon_local_bin_dir.join("storage_controller")
|
||||
}
|
||||
|
||||
@@ -17,7 +17,6 @@ use std::time::Duration;
|
||||
use anyhow::{bail, Context};
|
||||
use camino::Utf8PathBuf;
|
||||
use futures::SinkExt;
|
||||
use pageserver_api::controller_api::NodeRegisterRequest;
|
||||
use pageserver_api::models::{
|
||||
self, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, TimelineInfo,
|
||||
};
|
||||
@@ -31,7 +30,6 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::attachment_service::AttachmentService;
|
||||
use crate::local_env::PageServerConf;
|
||||
use crate::{background_process, local_env::LocalEnv};
|
||||
|
||||
@@ -111,7 +109,7 @@ impl PageServerNode {
|
||||
control_plane_api.as_str()
|
||||
));
|
||||
|
||||
// Attachment service uses the same auth as pageserver: if JWT is enabled
|
||||
// Storage controller uses the same auth as pageserver: if JWT is enabled
|
||||
// for us, we will also need it to talk to them.
|
||||
if matches!(self.conf.http_auth_type, AuthType::NeonJWT) {
|
||||
let jwt_token = self
|
||||
@@ -163,8 +161,8 @@ impl PageServerNode {
|
||||
.expect("non-Unicode path")
|
||||
}
|
||||
|
||||
pub async fn start(&self, config_overrides: &[&str], register: bool) -> anyhow::Result<()> {
|
||||
self.start_node(config_overrides, false, register).await
|
||||
pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
|
||||
self.start_node(config_overrides, false).await
|
||||
}
|
||||
|
||||
fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
|
||||
@@ -202,6 +200,28 @@ impl PageServerNode {
|
||||
String::from_utf8_lossy(&init_output.stderr),
|
||||
);
|
||||
|
||||
// Write metadata file, used by pageserver on startup to register itself with
|
||||
// the storage controller
|
||||
let metadata_path = datadir.join("metadata.json");
|
||||
|
||||
let (_http_host, http_port) =
|
||||
parse_host_port(&self.conf.listen_http_addr).expect("Unable to parse listen_http_addr");
|
||||
let http_port = http_port.unwrap_or(9898);
|
||||
// Intentionally hand-craft JSON: this acts as an implicit format compat test
|
||||
// in case the pageserver-side structure is edited, and reflects the real life
|
||||
// situation: the metadata is written by some other script.
|
||||
std::fs::write(
|
||||
metadata_path,
|
||||
serde_json::to_vec(&serde_json::json!({
|
||||
"host": "localhost",
|
||||
"port": self.pg_connection_config.port(),
|
||||
"http_host": "localhost",
|
||||
"http_port": http_port,
|
||||
}))
|
||||
.unwrap(),
|
||||
)
|
||||
.expect("Failed to write metadata file");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -209,27 +229,7 @@ impl PageServerNode {
|
||||
&self,
|
||||
config_overrides: &[&str],
|
||||
update_config: bool,
|
||||
register: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
// Register the node with the storage controller before starting pageserver: pageserver must be registered to
|
||||
// successfully call /re-attach and finish starting up.
|
||||
if register {
|
||||
let attachment_service = AttachmentService::from_env(&self.env);
|
||||
let (pg_host, pg_port) =
|
||||
parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
|
||||
let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr)
|
||||
.expect("Unable to parse listen_http_addr");
|
||||
attachment_service
|
||||
.node_register(NodeRegisterRequest {
|
||||
node_id: self.conf.id,
|
||||
listen_pg_addr: pg_host.to_string(),
|
||||
listen_pg_port: pg_port.unwrap_or(5432),
|
||||
listen_http_addr: http_host.to_string(),
|
||||
listen_http_port: http_port.unwrap_or(80),
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
// TODO: using a thread here because start_process() is not async but we need to call check_status()
|
||||
let datadir = self.repo_path();
|
||||
print!(
|
||||
|
||||
@@ -10,7 +10,7 @@ use pageserver_api::{
|
||||
TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse,
|
||||
TimelineCreateRequest, TimelineInfo,
|
||||
},
|
||||
shard::TenantShardId,
|
||||
shard::{ShardStripeSize, TenantShardId},
|
||||
};
|
||||
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
|
||||
use postgres_backend::AuthType;
|
||||
@@ -24,7 +24,7 @@ use utils::{
|
||||
id::{NodeId, TenantId},
|
||||
};
|
||||
|
||||
pub struct AttachmentService {
|
||||
pub struct StorageController {
|
||||
env: LocalEnv,
|
||||
listen: String,
|
||||
path: Utf8PathBuf,
|
||||
@@ -36,7 +36,7 @@ pub struct AttachmentService {
|
||||
|
||||
const COMMAND: &str = "storage_controller";
|
||||
|
||||
const ATTACHMENT_SERVICE_POSTGRES_VERSION: u32 = 16;
|
||||
const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct AttachHookRequest {
|
||||
@@ -59,7 +59,7 @@ pub struct InspectResponse {
|
||||
pub attachment: Option<(u32, NodeId)>,
|
||||
}
|
||||
|
||||
impl AttachmentService {
|
||||
impl StorageController {
|
||||
pub fn from_env(env: &LocalEnv) -> Self {
|
||||
let path = Utf8PathBuf::from_path_buf(env.base_data_dir.clone())
|
||||
.unwrap()
|
||||
@@ -136,27 +136,27 @@ impl AttachmentService {
|
||||
}
|
||||
|
||||
fn pid_file(&self) -> Utf8PathBuf {
|
||||
Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("attachment_service.pid"))
|
||||
Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_controller.pid"))
|
||||
.expect("non-Unicode path")
|
||||
}
|
||||
|
||||
/// PIDFile for the postgres instance used to store attachment service state
|
||||
/// PIDFile for the postgres instance used to store storage controller state
|
||||
fn postgres_pid_file(&self) -> Utf8PathBuf {
|
||||
Utf8PathBuf::from_path_buf(
|
||||
self.env
|
||||
.base_data_dir
|
||||
.join("attachment_service_postgres.pid"),
|
||||
.join("storage_controller_postgres.pid"),
|
||||
)
|
||||
.expect("non-Unicode path")
|
||||
}
|
||||
|
||||
/// Find the directory containing postgres binaries, such as `initdb` and `pg_ctl`
|
||||
///
|
||||
/// This usually uses ATTACHMENT_SERVICE_POSTGRES_VERSION of postgres, but will fall back
|
||||
/// This usually uses STORAGE_CONTROLLER_POSTGRES_VERSION of postgres, but will fall back
|
||||
/// to other versions if that one isn't found. Some automated tests create circumstances
|
||||
/// where only one version is available in pg_distrib_dir, such as `test_remote_extensions`.
|
||||
pub async fn get_pg_bin_dir(&self) -> anyhow::Result<Utf8PathBuf> {
|
||||
let prefer_versions = [ATTACHMENT_SERVICE_POSTGRES_VERSION, 15, 14];
|
||||
let prefer_versions = [STORAGE_CONTROLLER_POSTGRES_VERSION, 15, 14];
|
||||
|
||||
for v in prefer_versions {
|
||||
let path = Utf8PathBuf::from_path_buf(self.env.pg_bin_dir(v)?).unwrap();
|
||||
@@ -189,7 +189,7 @@ impl AttachmentService {
|
||||
///
|
||||
/// Returns the database url
|
||||
pub async fn setup_database(&self) -> anyhow::Result<String> {
|
||||
const DB_NAME: &str = "attachment_service";
|
||||
const DB_NAME: &str = "storage_controller";
|
||||
let database_url = format!("postgresql://localhost:{}/{DB_NAME}", self.postgres_port);
|
||||
|
||||
let pg_bin_dir = self.get_pg_bin_dir().await?;
|
||||
@@ -219,10 +219,10 @@ impl AttachmentService {
|
||||
}
|
||||
|
||||
pub async fn start(&self) -> anyhow::Result<()> {
|
||||
// Start a vanilla Postgres process used by the attachment service for persistence.
|
||||
// Start a vanilla Postgres process used by the storage controller for persistence.
|
||||
let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone())
|
||||
.unwrap()
|
||||
.join("attachment_service_db");
|
||||
.join("storage_controller_db");
|
||||
let pg_bin_dir = self.get_pg_bin_dir().await?;
|
||||
let pg_log_path = pg_data_path.join("postgres.log");
|
||||
|
||||
@@ -245,7 +245,7 @@ impl AttachmentService {
|
||||
.await?;
|
||||
};
|
||||
|
||||
println!("Starting attachment service database...");
|
||||
println!("Starting storage controller database...");
|
||||
let db_start_args = [
|
||||
"-w",
|
||||
"-D",
|
||||
@@ -256,7 +256,7 @@ impl AttachmentService {
|
||||
];
|
||||
|
||||
background_process::start_process(
|
||||
"attachment_service_db",
|
||||
"storage_controller_db",
|
||||
&self.env.base_data_dir,
|
||||
pg_bin_dir.join("pg_ctl").as_std_path(),
|
||||
db_start_args,
|
||||
@@ -300,7 +300,7 @@ impl AttachmentService {
|
||||
background_process::start_process(
|
||||
COMMAND,
|
||||
&self.env.base_data_dir,
|
||||
&self.env.attachment_service_bin(),
|
||||
&self.env.storage_controller_bin(),
|
||||
args,
|
||||
[(
|
||||
"NEON_REPO_DIR".to_string(),
|
||||
@@ -322,10 +322,10 @@ impl AttachmentService {
|
||||
pub async fn stop(&self, immediate: bool) -> anyhow::Result<()> {
|
||||
background_process::stop_process(immediate, COMMAND, &self.pid_file())?;
|
||||
|
||||
let pg_data_path = self.env.base_data_dir.join("attachment_service_db");
|
||||
let pg_data_path = self.env.base_data_dir.join("storage_controller_db");
|
||||
let pg_bin_dir = self.get_pg_bin_dir().await?;
|
||||
|
||||
println!("Stopping attachment service database...");
|
||||
println!("Stopping storage controller database...");
|
||||
let pg_stop_args = ["-D", &pg_data_path.to_string_lossy(), "stop"];
|
||||
let stop_status = Command::new(pg_bin_dir.join("pg_ctl"))
|
||||
.args(pg_stop_args)
|
||||
@@ -344,10 +344,10 @@ impl AttachmentService {
|
||||
// fine that stop failed. Otherwise it is an error that stop failed.
|
||||
const PG_STATUS_NOT_RUNNING: i32 = 3;
|
||||
if Some(PG_STATUS_NOT_RUNNING) == status_exitcode.code() {
|
||||
println!("Attachment service data base is already stopped");
|
||||
println!("Storage controller database is already stopped");
|
||||
return Ok(());
|
||||
} else {
|
||||
anyhow::bail!("Failed to stop attachment service database: {stop_status}")
|
||||
anyhow::bail!("Failed to stop storage controller database: {stop_status}")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -368,7 +368,7 @@ impl AttachmentService {
|
||||
}
|
||||
}
|
||||
|
||||
/// Simple HTTP request wrapper for calling into attachment service
|
||||
/// Simple HTTP request wrapper for calling into storage controller
|
||||
async fn dispatch<RQ, RS>(
|
||||
&self,
|
||||
method: hyper::Method,
|
||||
@@ -496,11 +496,15 @@ impl AttachmentService {
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
new_shard_count: u8,
|
||||
new_stripe_size: Option<ShardStripeSize>,
|
||||
) -> anyhow::Result<TenantShardSplitResponse> {
|
||||
self.dispatch(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{tenant_id}/shard_split"),
|
||||
Some(TenantShardSplitRequest { new_shard_count }),
|
||||
Some(TenantShardSplitRequest {
|
||||
new_shard_count,
|
||||
new_stripe_size,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -70,9 +70,9 @@ Should only be used e.g. for status check/tenant creation/list.
|
||||
Should only be used e.g. for status check.
|
||||
Currently also used for connection from any pageserver to any safekeeper.
|
||||
|
||||
"generations_api": Provides access to the upcall APIs served by the attachment service or the control plane.
|
||||
"generations_api": Provides access to the upcall APIs served by the storage controller or the control plane.
|
||||
|
||||
"admin": Provides access to the control plane and admin APIs of the attachment service.
|
||||
"admin": Provides access to the control plane and admin APIs of the storage controller.
|
||||
|
||||
### CLI
|
||||
CLI generates a key pair during call to `neon_local init` with the following commands:
|
||||
|
||||
@@ -88,8 +88,6 @@ impl FromStr for NodeAvailability {
|
||||
}
|
||||
}
|
||||
|
||||
/// FIXME: this is a duplicate of the type in the attachment_service crate, because the
|
||||
/// type needs to be defined with diesel traits in there.
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)]
|
||||
pub enum NodeSchedulingPolicy {
|
||||
Active,
|
||||
|
||||
@@ -198,6 +198,13 @@ pub struct TimelineCreateRequest {
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TenantShardSplitRequest {
|
||||
pub new_shard_count: u8,
|
||||
|
||||
// A tenant's stripe size is only meaningful the first time their shard count goes
|
||||
// above 1: therefore during a split from 1->N shards, we may modify the stripe size.
|
||||
//
|
||||
// If this is set while the stripe count is being increased from an already >1 value,
|
||||
// then the request will fail with 400.
|
||||
pub new_stripe_size: Option<ShardStripeSize>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
||||
@@ -6,11 +6,18 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::NodeId;
|
||||
|
||||
use crate::shard::TenantShardId;
|
||||
use crate::{controller_api::NodeRegisterRequest, shard::TenantShardId};
|
||||
|
||||
/// Upcall message sent by the pageserver to the configured `control_plane_api` on
|
||||
/// startup.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ReAttachRequest {
|
||||
pub node_id: NodeId,
|
||||
|
||||
/// Optional inline self-registration: this is useful with the storage controller,
|
||||
/// if the node already has a node_id set.
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub register: Option<NodeRegisterRequest>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
||||
@@ -84,9 +84,6 @@ where
|
||||
info!("Handling request");
|
||||
}
|
||||
|
||||
// Take a copy of the path for error logging
|
||||
let path = request.uri().path().to_string();
|
||||
|
||||
// No special handling for panics here. There's a `tracing_panic_hook` from another
|
||||
// module to do that globally.
|
||||
let res = handler(request).await;
|
||||
@@ -113,7 +110,7 @@ where
|
||||
}
|
||||
Ok(response)
|
||||
}
|
||||
Err(err) => Ok(api_error_handler(err, Some(&path))),
|
||||
Err(err) => Ok(api_error_handler(err)),
|
||||
}
|
||||
}
|
||||
.instrument(request_span)
|
||||
|
||||
@@ -108,7 +108,7 @@ impl HttpErrorBody {
|
||||
|
||||
pub async fn route_error_handler(err: routerify::RouteError) -> Response<Body> {
|
||||
match err.downcast::<ApiError>() {
|
||||
Ok(api_error) => api_error_handler(*api_error, None),
|
||||
Ok(api_error) => api_error_handler(*api_error),
|
||||
Err(other_error) => {
|
||||
// We expect all the request handlers to return an ApiError, so this should
|
||||
// not be reached. But just in case.
|
||||
@@ -121,16 +121,12 @@ pub async fn route_error_handler(err: routerify::RouteError) -> Response<Body> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn api_error_handler(api_error: ApiError, path: Option<&str>) -> Response<Body> {
|
||||
pub fn api_error_handler(api_error: ApiError) -> Response<Body> {
|
||||
// Print a stack trace for Internal Server errors
|
||||
|
||||
match api_error {
|
||||
ApiError::Forbidden(_) | ApiError::Unauthorized(_) => {
|
||||
warn!(
|
||||
"Error processing HTTP request: {api_error:#} {}{}",
|
||||
path.as_ref().map(|_| "at").unwrap_or(""),
|
||||
path.unwrap_or("")
|
||||
)
|
||||
warn!("Error processing HTTP request: {api_error:#}")
|
||||
}
|
||||
ApiError::ResourceUnavailable(_) => info!("Error processing HTTP request: {api_error:#}"),
|
||||
ApiError::NotFound(_) => info!("Error processing HTTP request: {api_error:#}"),
|
||||
|
||||
@@ -123,6 +123,12 @@ impl PageserverFeedback {
|
||||
rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
|
||||
}
|
||||
}
|
||||
b"shard_number" => {
|
||||
let len = buf.get_i32();
|
||||
// TODO: this will be implemented in the next update,
|
||||
// for now, we just skip the value.
|
||||
buf.advance(len as usize);
|
||||
}
|
||||
_ => {
|
||||
let len = buf.get_i32();
|
||||
warn!(
|
||||
|
||||
@@ -7,8 +7,9 @@
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use serde;
|
||||
use serde::de::IntoDeserializer;
|
||||
use std::env;
|
||||
use std::{collections::HashMap, env};
|
||||
use storage_broker::Uri;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::id::ConnectionId;
|
||||
@@ -304,6 +305,26 @@ impl<T> BuilderValue<T> {
|
||||
}
|
||||
}
|
||||
|
||||
// Certain metadata (e.g. externally-addressable name, AZ) is delivered
|
||||
// as a separate structure. This information is not neeed by the pageserver
|
||||
// itself, it is only used for registering the pageserver with the control
|
||||
// plane and/or storage controller.
|
||||
//
|
||||
#[derive(serde::Deserialize)]
|
||||
pub(crate) struct NodeMetadata {
|
||||
#[serde(rename = "host")]
|
||||
pub(crate) postgres_host: String,
|
||||
#[serde(rename = "port")]
|
||||
pub(crate) postgres_port: u16,
|
||||
pub(crate) http_host: String,
|
||||
pub(crate) http_port: u16,
|
||||
|
||||
// Deployment tools may write fields to the metadata file beyond what we
|
||||
// use in this type: this type intentionally only names fields that require.
|
||||
#[serde(flatten)]
|
||||
pub(crate) other: HashMap<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
// needed to simplify config construction
|
||||
struct PageServerConfigBuilder {
|
||||
listen_pg_addr: BuilderValue<String>,
|
||||
@@ -761,6 +782,10 @@ impl PageServerConf {
|
||||
self.workdir.join("deletion")
|
||||
}
|
||||
|
||||
pub fn metadata_path(&self) -> Utf8PathBuf {
|
||||
self.workdir.join("metadata.json")
|
||||
}
|
||||
|
||||
pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf {
|
||||
// Encode a version in the filename, so that if we ever switch away from JSON we can
|
||||
// increment this.
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::collections::HashMap;
|
||||
|
||||
use futures::Future;
|
||||
use pageserver_api::{
|
||||
controller_api::NodeRegisterRequest,
|
||||
shard::TenantShardId,
|
||||
upcall_api::{
|
||||
ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
|
||||
@@ -12,7 +13,10 @@ use tokio_util::sync::CancellationToken;
|
||||
use url::Url;
|
||||
use utils::{backoff, generation::Generation, id::NodeId};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::{
|
||||
config::{NodeMetadata, PageServerConf},
|
||||
virtual_file::on_fatal_io_error,
|
||||
};
|
||||
|
||||
/// The Pageserver's client for using the control plane API: this is a small subset
|
||||
/// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
|
||||
@@ -32,6 +36,7 @@ pub enum RetryForeverError {
|
||||
pub trait ControlPlaneGenerationsApi {
|
||||
fn re_attach(
|
||||
&self,
|
||||
conf: &PageServerConf,
|
||||
) -> impl Future<Output = Result<HashMap<TenantShardId, Generation>, RetryForeverError>> + Send;
|
||||
fn validate(
|
||||
&self,
|
||||
@@ -110,13 +115,59 @@ impl ControlPlaneClient {
|
||||
|
||||
impl ControlPlaneGenerationsApi for ControlPlaneClient {
|
||||
/// Block until we get a successful response, or error out if we are shut down
|
||||
async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
|
||||
async fn re_attach(
|
||||
&self,
|
||||
conf: &PageServerConf,
|
||||
) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
|
||||
let re_attach_path = self
|
||||
.base_url
|
||||
.join("re-attach")
|
||||
.expect("Failed to build re-attach path");
|
||||
|
||||
// Include registration content in the re-attach request if a metadata file is readable
|
||||
let metadata_path = conf.metadata_path();
|
||||
let register = match tokio::fs::read_to_string(&metadata_path).await {
|
||||
Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
|
||||
Ok(m) => {
|
||||
// Since we run one time at startup, be generous in our logging and
|
||||
// dump all metadata.
|
||||
tracing::info!(
|
||||
"Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
|
||||
m.postgres_host,
|
||||
m.postgres_port,
|
||||
m.http_host,
|
||||
m.http_port,
|
||||
m.other
|
||||
);
|
||||
|
||||
Some(NodeRegisterRequest {
|
||||
node_id: conf.id,
|
||||
listen_pg_addr: m.postgres_host,
|
||||
listen_pg_port: m.postgres_port,
|
||||
listen_http_addr: m.http_host,
|
||||
listen_http_port: m.http_port,
|
||||
})
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Unreadable metadata in {metadata_path}: {e}");
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
// This is legal: we may have been deployed with some external script
|
||||
// doing registration for us.
|
||||
tracing::info!("Metadata file not found at {metadata_path}");
|
||||
} else {
|
||||
on_fatal_io_error(&e, &format!("Loading metadata at {metadata_path}"))
|
||||
}
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let request = ReAttachRequest {
|
||||
node_id: self.node_id,
|
||||
register,
|
||||
};
|
||||
|
||||
fail::fail_point!("control-plane-client-re-attach");
|
||||
|
||||
@@ -831,7 +831,10 @@ mod test {
|
||||
}
|
||||
|
||||
impl ControlPlaneGenerationsApi for MockControlPlane {
|
||||
async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
|
||||
async fn re_attach(
|
||||
&self,
|
||||
_conf: &PageServerConf,
|
||||
) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn validate(
|
||||
|
||||
@@ -1151,7 +1151,12 @@ async fn tenant_shard_split_handler(
|
||||
|
||||
let new_shards = state
|
||||
.tenant_manager
|
||||
.shard_split(tenant_shard_id, ShardCount::new(req.new_shard_count), &ctx)
|
||||
.shard_split(
|
||||
tenant_shard_id,
|
||||
ShardCount::new(req.new_shard_count),
|
||||
req.new_stripe_size,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
@@ -2247,7 +2252,7 @@ pub fn make_router(
|
||||
.get("/v1/location_config", |r| {
|
||||
api_handler(r, list_location_config_handler)
|
||||
})
|
||||
.get("/v1/location_config/:tenant_id", |r| {
|
||||
.get("/v1/location_config/:tenant_shard_id", |r| {
|
||||
api_handler(r, get_location_config_handler)
|
||||
})
|
||||
.put(
|
||||
|
||||
@@ -4625,10 +4625,7 @@ mod tests {
|
||||
drop(guard);
|
||||
|
||||
// Pick a big LSN such that we query over all the changes.
|
||||
// Technically, u64::MAX - 1 is the largest LSN supported by the read path,
|
||||
// but there seems to be a bug on the non-vectored search path which surfaces
|
||||
// in that case.
|
||||
let reads_lsn = Lsn(u64::MAX - 1000);
|
||||
let reads_lsn = Lsn(u64::MAX - 1);
|
||||
|
||||
for read in reads {
|
||||
info!("Doing vectored read on {:?}", read);
|
||||
@@ -5145,4 +5142,23 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_at_max_lsn() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_read_at_max_lsn")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
let lsn = Lsn(0x10);
|
||||
bulk_insert_compact_gc(tline.clone(), &ctx, lsn, 50, 10000).await?;
|
||||
|
||||
let test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
|
||||
let read_lsn = Lsn(u64::MAX - 1);
|
||||
|
||||
assert!(tline.get(test_key, read_lsn, &ctx).await.is_ok());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,9 @@ use futures::stream::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId};
|
||||
use pageserver_api::shard::{
|
||||
ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
|
||||
};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use std::borrow::Cow;
|
||||
use std::cmp::Ordering;
|
||||
@@ -295,7 +297,7 @@ async fn init_load_generations(
|
||||
} else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
|
||||
info!("Calling control plane API to re-attach tenants");
|
||||
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
|
||||
match client.re_attach().await {
|
||||
match client.re_attach(conf).await {
|
||||
Ok(tenants) => tenants,
|
||||
Err(RetryForeverError::ShuttingDown) => {
|
||||
anyhow::bail!("Shut down while waiting for control plane re-attach response")
|
||||
@@ -1439,11 +1441,12 @@ impl TenantManager {
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
new_shard_count: ShardCount,
|
||||
new_stripe_size: Option<ShardStripeSize>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<TenantShardId>> {
|
||||
let tenant = get_tenant(tenant_shard_id, true)?;
|
||||
|
||||
// Plan: identify what the new child shards will be
|
||||
// Validate the incoming request
|
||||
if new_shard_count.count() <= tenant_shard_id.shard_count.count() {
|
||||
anyhow::bail!("Requested shard count is not an increase");
|
||||
}
|
||||
@@ -1452,10 +1455,18 @@ impl TenantManager {
|
||||
anyhow::bail!("Requested split is not a power of two");
|
||||
}
|
||||
|
||||
let parent_shard_identity = tenant.shard_identity;
|
||||
let parent_tenant_conf = tenant.get_tenant_conf();
|
||||
let parent_generation = tenant.generation;
|
||||
if let Some(new_stripe_size) = new_stripe_size {
|
||||
if tenant.get_shard_stripe_size() != new_stripe_size
|
||||
&& tenant_shard_id.shard_count.count() > 1
|
||||
{
|
||||
// This tenant already has multiple shards, it is illegal to try and change its stripe size
|
||||
anyhow::bail!(
|
||||
"Shard stripe size may not be modified once tenant has multiple shards"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Plan: identify what the new child shards will be
|
||||
let child_shards = tenant_shard_id.split(new_shard_count);
|
||||
tracing::info!(
|
||||
"Shard {} splits into: {}",
|
||||
@@ -1466,6 +1477,10 @@ impl TenantManager {
|
||||
.join(",")
|
||||
);
|
||||
|
||||
let parent_shard_identity = tenant.shard_identity;
|
||||
let parent_tenant_conf = tenant.get_tenant_conf();
|
||||
let parent_generation = tenant.generation;
|
||||
|
||||
// Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
|
||||
if let Err(e) = tenant.split_prepare(&child_shards).await {
|
||||
// If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
|
||||
@@ -1515,6 +1530,9 @@ impl TenantManager {
|
||||
// Phase 3: Spawn the child shards
|
||||
for child_shard in &child_shards {
|
||||
let mut child_shard_identity = parent_shard_identity;
|
||||
if let Some(new_stripe_size) = new_stripe_size {
|
||||
child_shard_identity.stripe_size = new_stripe_size;
|
||||
}
|
||||
child_shard_identity.count = child_shard.shard_count;
|
||||
child_shard_identity.number = child_shard.shard_number;
|
||||
|
||||
|
||||
@@ -2478,7 +2478,7 @@ impl Timeline {
|
||||
// 'prev_lsn' tracks the last LSN that we were at in our search. It's used
|
||||
// to check that each iteration make some progress, to break infinite
|
||||
// looping if something goes wrong.
|
||||
let mut prev_lsn = Lsn(u64::MAX);
|
||||
let mut prev_lsn = None;
|
||||
|
||||
let mut result = ValueReconstructResult::Continue;
|
||||
let mut cont_lsn = Lsn(request_lsn.0 + 1);
|
||||
@@ -2498,18 +2498,20 @@ impl Timeline {
|
||||
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
|
||||
return Ok(traversal_path);
|
||||
}
|
||||
if prev_lsn <= cont_lsn {
|
||||
// Didn't make any progress in last iteration. Error out to avoid
|
||||
// getting stuck in the loop.
|
||||
return Err(layer_traversal_error(format!(
|
||||
"could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
|
||||
key,
|
||||
Lsn(cont_lsn.0 - 1),
|
||||
request_lsn,
|
||||
timeline.ancestor_lsn
|
||||
), traversal_path));
|
||||
if let Some(prev) = prev_lsn {
|
||||
if prev <= cont_lsn {
|
||||
// Didn't make any progress in last iteration. Error out to avoid
|
||||
// getting stuck in the loop.
|
||||
return Err(layer_traversal_error(format!(
|
||||
"could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
|
||||
key,
|
||||
Lsn(cont_lsn.0 - 1),
|
||||
request_lsn,
|
||||
timeline.ancestor_lsn
|
||||
), traversal_path));
|
||||
}
|
||||
}
|
||||
prev_lsn = cont_lsn;
|
||||
prev_lsn = Some(cont_lsn);
|
||||
}
|
||||
ValueReconstructResult::Missing => {
|
||||
return Err(layer_traversal_error(
|
||||
@@ -2539,7 +2541,7 @@ impl Timeline {
|
||||
|
||||
timeline_owned = timeline.get_ready_ancestor_timeline(ctx).await?;
|
||||
timeline = &*timeline_owned;
|
||||
prev_lsn = Lsn(u64::MAX);
|
||||
prev_lsn = None;
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ use pin_project_lite::pin_project;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{metrics::NUM_CLIENT_CONNECTION_GAUGE, serverless::tls_listener::AsyncAccept};
|
||||
use crate::metrics::NUM_CLIENT_CONNECTION_GAUGE;
|
||||
|
||||
pub struct ProxyProtocolAccept {
|
||||
pub incoming: AddrIncoming,
|
||||
@@ -331,15 +331,15 @@ impl<T: AsyncRead> AsyncRead for WithClientIp<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncAccept for ProxyProtocolAccept {
|
||||
type Connection = WithConnectionGuard<WithClientIp<AddrStream>>;
|
||||
impl Accept for ProxyProtocolAccept {
|
||||
type Conn = WithConnectionGuard<WithClientIp<AddrStream>>;
|
||||
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll_accept(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<Self::Connection, Self::Error>>> {
|
||||
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
|
||||
let conn = ready!(Pin::new(&mut self.incoming).poll_accept(cx)?);
|
||||
tracing::info!(protocol = self.protocol, "accepted new TCP connection");
|
||||
let Some(conn) = conn else {
|
||||
|
||||
@@ -21,24 +21,19 @@ pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::metrics::TLS_HANDSHAKE_FAILURES;
|
||||
use crate::protocol2::{ProxyProtocolAccept, WithClientIp, WithConnectionGuard};
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::serverless::backend::PoolingBackend;
|
||||
use crate::{cancellation::CancellationHandler, config::ProxyConfig};
|
||||
use futures::StreamExt;
|
||||
use hyper::{
|
||||
server::{
|
||||
accept,
|
||||
conn::{AddrIncoming, AddrStream},
|
||||
},
|
||||
server::conn::{AddrIncoming, AddrStream},
|
||||
Body, Method, Request, Response,
|
||||
};
|
||||
|
||||
use std::convert::Infallible;
|
||||
use std::net::IpAddr;
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
use std::{future::ready, sync::Arc};
|
||||
use tls_listener::TlsListener;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -105,19 +100,12 @@ pub async fn task_main(
|
||||
let ws_connections = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
ws_connections.close(); // allows `ws_connections.wait to complete`
|
||||
|
||||
let tls_listener = TlsListener::new(tls_acceptor, addr_incoming).filter(|conn| {
|
||||
if let Err(err) = conn {
|
||||
error!(
|
||||
protocol = "http",
|
||||
"failed to accept TLS connection: {err:?}"
|
||||
);
|
||||
TLS_HANDSHAKE_FAILURES.inc();
|
||||
ready(false)
|
||||
} else {
|
||||
info!(protocol = "http", "accepted new TLS connection");
|
||||
ready(true)
|
||||
}
|
||||
});
|
||||
let tls_listener = TlsListener::new(
|
||||
tls_acceptor,
|
||||
addr_incoming,
|
||||
"http",
|
||||
config.handshake_timeout,
|
||||
);
|
||||
|
||||
let make_svc = hyper::service::make_service_fn(
|
||||
|stream: &tokio_rustls::server::TlsStream<
|
||||
@@ -174,7 +162,7 @@ pub async fn task_main(
|
||||
},
|
||||
);
|
||||
|
||||
hyper::Server::builder(accept::from_stream(tls_listener))
|
||||
hyper::Server::builder(tls_listener)
|
||||
.serve(make_svc)
|
||||
.with_graceful_shutdown(cancellation_token.cancelled())
|
||||
.await?;
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::{
|
||||
CachedNodeInfo,
|
||||
},
|
||||
context::RequestMonitoring,
|
||||
error::{ErrorKind, ReportableError, UserFacingError},
|
||||
proxy::connect_compute::ConnectMechanism,
|
||||
};
|
||||
|
||||
@@ -117,6 +118,30 @@ pub enum HttpConnError {
|
||||
WakeCompute(#[from] WakeComputeError),
|
||||
}
|
||||
|
||||
impl ReportableError for HttpConnError {
|
||||
fn get_error_kind(&self) -> ErrorKind {
|
||||
match self {
|
||||
HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
|
||||
HttpConnError::ConnectionError(p) => p.get_error_kind(),
|
||||
HttpConnError::GetAuthInfo(a) => a.get_error_kind(),
|
||||
HttpConnError::AuthError(a) => a.get_error_kind(),
|
||||
HttpConnError::WakeCompute(w) => w.get_error_kind(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UserFacingError for HttpConnError {
|
||||
fn to_string_client(&self) -> String {
|
||||
match self {
|
||||
HttpConnError::ConnectionClosedAbruptly(_) => self.to_string(),
|
||||
HttpConnError::ConnectionError(p) => p.to_string(),
|
||||
HttpConnError::GetAuthInfo(c) => c.to_string_client(),
|
||||
HttpConnError::AuthError(c) => c.to_string_client(),
|
||||
HttpConnError::WakeCompute(c) => c.to_string_client(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct TokioMechanism {
|
||||
pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
|
||||
conn_info: ConnInfo,
|
||||
|
||||
@@ -119,16 +119,12 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
||||
}
|
||||
}
|
||||
|
||||
fn put(
|
||||
pool: &RwLock<Self>,
|
||||
conn_info: &ConnInfo,
|
||||
client: ClientInner<C>,
|
||||
) -> anyhow::Result<()> {
|
||||
fn put(pool: &RwLock<Self>, conn_info: &ConnInfo, client: ClientInner<C>) {
|
||||
let conn_id = client.conn_id;
|
||||
|
||||
if client.is_closed() {
|
||||
info!(%conn_id, "pool: throwing away connection '{conn_info}' because connection is closed");
|
||||
return Ok(());
|
||||
return;
|
||||
}
|
||||
let global_max_conn = pool.read().global_pool_size_max_conns;
|
||||
if pool
|
||||
@@ -138,7 +134,7 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
||||
>= global_max_conn
|
||||
{
|
||||
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full");
|
||||
return Ok(());
|
||||
return;
|
||||
}
|
||||
|
||||
// return connection to the pool
|
||||
@@ -172,8 +168,6 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
||||
} else {
|
||||
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -612,13 +606,6 @@ impl<C: ClientInnerExt> Client<C> {
|
||||
let inner = inner.as_mut().expect("client inner should not be removed");
|
||||
(&mut inner.inner, Discard { pool, conn_info })
|
||||
}
|
||||
|
||||
pub fn check_idle(&mut self, status: ReadyForQueryStatus) {
|
||||
self.inner().1.check_idle(status)
|
||||
}
|
||||
pub fn discard(&mut self) {
|
||||
self.inner().1.discard()
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> Discard<'_, C> {
|
||||
@@ -660,7 +647,7 @@ impl<C: ClientInnerExt> Client<C> {
|
||||
// return connection to the pool
|
||||
return Some(move || {
|
||||
let _span = current_span.enter();
|
||||
let _ = EndpointConnPool::put(&conn_pool, &conn_info, client);
|
||||
EndpointConnPool::put(&conn_pool, &conn_info, client);
|
||||
});
|
||||
}
|
||||
None
|
||||
@@ -739,7 +726,7 @@ mod tests {
|
||||
{
|
||||
let mut client = Client::new(create_inner(), conn_info.clone(), ep_pool.clone());
|
||||
assert_eq!(0, pool.get_global_connections_count());
|
||||
client.discard();
|
||||
client.inner().1.discard();
|
||||
// Discard should not add the connection from the pool.
|
||||
assert_eq!(0, pool.get_global_connections_count());
|
||||
}
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
use std::pin::pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::bail;
|
||||
use futures::future::select;
|
||||
use futures::future::try_join;
|
||||
use futures::future::Either;
|
||||
use futures::StreamExt;
|
||||
use futures::TryFutureExt;
|
||||
use hyper::body::HttpBody;
|
||||
use hyper::header;
|
||||
use hyper::http::HeaderName;
|
||||
@@ -11,13 +15,16 @@ use hyper::StatusCode;
|
||||
use hyper::{Body, HeaderMap, Request};
|
||||
use serde_json::json;
|
||||
use serde_json::Value;
|
||||
use tokio::try_join;
|
||||
use tokio::time;
|
||||
use tokio_postgres::error::DbError;
|
||||
use tokio_postgres::error::ErrorPosition;
|
||||
use tokio_postgres::error::SqlState;
|
||||
use tokio_postgres::GenericClient;
|
||||
use tokio_postgres::IsolationLevel;
|
||||
use tokio_postgres::NoTls;
|
||||
use tokio_postgres::ReadyForQueryStatus;
|
||||
use tokio_postgres::Transaction;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
use url::Url;
|
||||
@@ -30,9 +37,13 @@ use crate::auth::ComputeUserInfoParseError;
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::config::TlsConfig;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::error::ErrorKind;
|
||||
use crate::error::ReportableError;
|
||||
use crate::error::UserFacingError;
|
||||
use crate::metrics::HTTP_CONTENT_LENGTH;
|
||||
use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::serverless::backend::HttpConnError;
|
||||
use crate::DbName;
|
||||
use crate::RoleName;
|
||||
|
||||
@@ -40,6 +51,7 @@ use super::backend::PoolingBackend;
|
||||
use super::conn_pool::ConnInfo;
|
||||
use super::json::json_to_pg_text;
|
||||
use super::json::pg_text_row_to_json;
|
||||
use super::json::JsonConversionError;
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
@@ -110,6 +122,18 @@ pub enum ConnInfoError {
|
||||
MalformedEndpoint,
|
||||
}
|
||||
|
||||
impl ReportableError for ConnInfoError {
|
||||
fn get_error_kind(&self) -> ErrorKind {
|
||||
ErrorKind::User
|
||||
}
|
||||
}
|
||||
|
||||
impl UserFacingError for ConnInfoError {
|
||||
fn to_string_client(&self) -> String {
|
||||
self.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
fn get_conn_info(
|
||||
ctx: &mut RequestMonitoring,
|
||||
headers: &HeaderMap,
|
||||
@@ -194,108 +218,123 @@ pub async fn handle(
|
||||
request: Request<Body>,
|
||||
backend: Arc<PoolingBackend>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let result = tokio::time::timeout(
|
||||
config.http_config.request_timeout,
|
||||
handle_inner(config, &mut ctx, request, backend),
|
||||
)
|
||||
.await;
|
||||
let cancel = CancellationToken::new();
|
||||
let cancel2 = cancel.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
time::sleep(config.http_config.request_timeout).await;
|
||||
cancel2.cancel();
|
||||
});
|
||||
|
||||
let result = handle_inner(cancel, config, &mut ctx, request, backend).await;
|
||||
handle.abort();
|
||||
|
||||
let mut response = match result {
|
||||
Ok(r) => match r {
|
||||
Ok(r) => {
|
||||
ctx.set_success();
|
||||
r
|
||||
}
|
||||
Err(e) => {
|
||||
// TODO: ctx.set_error_kind(e.get_error_type());
|
||||
|
||||
let mut message = format!("{:?}", e);
|
||||
let db_error = e
|
||||
.downcast_ref::<tokio_postgres::Error>()
|
||||
.and_then(|e| e.as_db_error());
|
||||
fn get<'a, T: serde::Serialize>(
|
||||
db: Option<&'a DbError>,
|
||||
x: impl FnOnce(&'a DbError) -> T,
|
||||
) -> Value {
|
||||
db.map(x)
|
||||
.and_then(|t| serde_json::to_value(t).ok())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
if let Some(db_error) = db_error {
|
||||
db_error.message().clone_into(&mut message);
|
||||
}
|
||||
|
||||
let position = db_error.and_then(|db| db.position());
|
||||
let (position, internal_position, internal_query) = match position {
|
||||
Some(ErrorPosition::Original(position)) => (
|
||||
Value::String(position.to_string()),
|
||||
Value::Null,
|
||||
Value::Null,
|
||||
),
|
||||
Some(ErrorPosition::Internal { position, query }) => (
|
||||
Value::Null,
|
||||
Value::String(position.to_string()),
|
||||
Value::String(query.clone()),
|
||||
),
|
||||
None => (Value::Null, Value::Null, Value::Null),
|
||||
};
|
||||
|
||||
let code = get(db_error, |db| db.code().code());
|
||||
let severity = get(db_error, |db| db.severity());
|
||||
let detail = get(db_error, |db| db.detail());
|
||||
let hint = get(db_error, |db| db.hint());
|
||||
let where_ = get(db_error, |db| db.where_());
|
||||
let table = get(db_error, |db| db.table());
|
||||
let column = get(db_error, |db| db.column());
|
||||
let schema = get(db_error, |db| db.schema());
|
||||
let datatype = get(db_error, |db| db.datatype());
|
||||
let constraint = get(db_error, |db| db.constraint());
|
||||
let file = get(db_error, |db| db.file());
|
||||
let line = get(db_error, |db| db.line().map(|l| l.to_string()));
|
||||
let routine = get(db_error, |db| db.routine());
|
||||
|
||||
error!(
|
||||
?code,
|
||||
"sql-over-http per-client task finished with an error: {e:#}"
|
||||
);
|
||||
// TODO: this shouldn't always be bad request.
|
||||
json_response(
|
||||
StatusCode::BAD_REQUEST,
|
||||
json!({
|
||||
"message": message,
|
||||
"code": code,
|
||||
"detail": detail,
|
||||
"hint": hint,
|
||||
"position": position,
|
||||
"internalPosition": internal_position,
|
||||
"internalQuery": internal_query,
|
||||
"severity": severity,
|
||||
"where": where_,
|
||||
"table": table,
|
||||
"column": column,
|
||||
"schema": schema,
|
||||
"dataType": datatype,
|
||||
"constraint": constraint,
|
||||
"file": file,
|
||||
"line": line,
|
||||
"routine": routine,
|
||||
}),
|
||||
)?
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
// TODO: when http error classification is done, distinguish between
|
||||
// timeout on sql vs timeout in proxy/cplane
|
||||
// ctx.set_error_kind(crate::error::ErrorKind::RateLimit);
|
||||
Ok(r) => {
|
||||
ctx.set_success();
|
||||
r
|
||||
}
|
||||
Err(e @ SqlOverHttpError::Cancelled(_)) => {
|
||||
let error_kind = e.get_error_kind();
|
||||
ctx.set_error_kind(error_kind);
|
||||
|
||||
let message = format!(
|
||||
"HTTP-Connection timed out, execution time exceeded {} seconds",
|
||||
config.http_config.request_timeout.as_secs()
|
||||
"Query cancelled, runtime exceeded. SQL queries over HTTP must not exceed {} seconds of runtime. Please consider using our websocket based connections",
|
||||
config.http_config.request_timeout.as_secs_f64()
|
||||
);
|
||||
error!(message);
|
||||
|
||||
tracing::info!(
|
||||
kind=error_kind.to_metric_label(),
|
||||
error=%e,
|
||||
msg=message,
|
||||
"forwarding error to user"
|
||||
);
|
||||
|
||||
json_response(
|
||||
StatusCode::GATEWAY_TIMEOUT,
|
||||
json!({ "message": message, "code": StatusCode::GATEWAY_TIMEOUT.as_u16() }),
|
||||
StatusCode::BAD_REQUEST,
|
||||
json!({ "message": message, "code": SqlState::PROTOCOL_VIOLATION.code() }),
|
||||
)?
|
||||
}
|
||||
Err(e) => {
|
||||
let error_kind = e.get_error_kind();
|
||||
ctx.set_error_kind(error_kind);
|
||||
|
||||
let mut message = e.to_string_client();
|
||||
let db_error = match &e {
|
||||
SqlOverHttpError::ConnectCompute(HttpConnError::ConnectionError(e))
|
||||
| SqlOverHttpError::Postgres(e) => e.as_db_error(),
|
||||
_ => None,
|
||||
};
|
||||
fn get<'a, T: serde::Serialize>(
|
||||
db: Option<&'a DbError>,
|
||||
x: impl FnOnce(&'a DbError) -> T,
|
||||
) -> Value {
|
||||
db.map(x)
|
||||
.and_then(|t| serde_json::to_value(t).ok())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
if let Some(db_error) = db_error {
|
||||
db_error.message().clone_into(&mut message);
|
||||
}
|
||||
|
||||
let position = db_error.and_then(|db| db.position());
|
||||
let (position, internal_position, internal_query) = match position {
|
||||
Some(ErrorPosition::Original(position)) => (
|
||||
Value::String(position.to_string()),
|
||||
Value::Null,
|
||||
Value::Null,
|
||||
),
|
||||
Some(ErrorPosition::Internal { position, query }) => (
|
||||
Value::Null,
|
||||
Value::String(position.to_string()),
|
||||
Value::String(query.clone()),
|
||||
),
|
||||
None => (Value::Null, Value::Null, Value::Null),
|
||||
};
|
||||
|
||||
let code = get(db_error, |db| db.code().code());
|
||||
let severity = get(db_error, |db| db.severity());
|
||||
let detail = get(db_error, |db| db.detail());
|
||||
let hint = get(db_error, |db| db.hint());
|
||||
let where_ = get(db_error, |db| db.where_());
|
||||
let table = get(db_error, |db| db.table());
|
||||
let column = get(db_error, |db| db.column());
|
||||
let schema = get(db_error, |db| db.schema());
|
||||
let datatype = get(db_error, |db| db.datatype());
|
||||
let constraint = get(db_error, |db| db.constraint());
|
||||
let file = get(db_error, |db| db.file());
|
||||
let line = get(db_error, |db| db.line().map(|l| l.to_string()));
|
||||
let routine = get(db_error, |db| db.routine());
|
||||
|
||||
tracing::info!(
|
||||
kind=error_kind.to_metric_label(),
|
||||
error=%e,
|
||||
msg=message,
|
||||
"forwarding error to user"
|
||||
);
|
||||
|
||||
// TODO: this shouldn't always be bad request.
|
||||
json_response(
|
||||
StatusCode::BAD_REQUEST,
|
||||
json!({
|
||||
"message": message,
|
||||
"code": code,
|
||||
"detail": detail,
|
||||
"hint": hint,
|
||||
"position": position,
|
||||
"internalPosition": internal_position,
|
||||
"internalQuery": internal_query,
|
||||
"severity": severity,
|
||||
"where": where_,
|
||||
"table": table,
|
||||
"column": column,
|
||||
"schema": schema,
|
||||
"dataType": datatype,
|
||||
"constraint": constraint,
|
||||
"file": file,
|
||||
"line": line,
|
||||
"routine": routine,
|
||||
}),
|
||||
)?
|
||||
}
|
||||
};
|
||||
@@ -307,12 +346,101 @@ pub async fn handle(
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum SqlOverHttpError {
|
||||
#[error("{0}")]
|
||||
ReadPayload(#[from] ReadPayloadError),
|
||||
#[error("{0}")]
|
||||
ConnectCompute(#[from] HttpConnError),
|
||||
#[error("{0}")]
|
||||
ConnInfo(#[from] ConnInfoError),
|
||||
#[error("request is too large (max is {MAX_REQUEST_SIZE} bytes)")]
|
||||
RequestTooLarge,
|
||||
#[error("response is too large (max is {MAX_RESPONSE_SIZE} bytes)")]
|
||||
ResponseTooLarge,
|
||||
#[error("invalid isolation level")]
|
||||
InvalidIsolationLevel,
|
||||
#[error("{0}")]
|
||||
Postgres(#[from] tokio_postgres::Error),
|
||||
#[error("{0}")]
|
||||
JsonConversion(#[from] JsonConversionError),
|
||||
#[error("{0}")]
|
||||
Cancelled(SqlOverHttpCancel),
|
||||
}
|
||||
|
||||
impl ReportableError for SqlOverHttpError {
|
||||
fn get_error_kind(&self) -> ErrorKind {
|
||||
match self {
|
||||
SqlOverHttpError::ReadPayload(e) => e.get_error_kind(),
|
||||
SqlOverHttpError::ConnectCompute(e) => e.get_error_kind(),
|
||||
SqlOverHttpError::ConnInfo(e) => e.get_error_kind(),
|
||||
SqlOverHttpError::RequestTooLarge => ErrorKind::User,
|
||||
SqlOverHttpError::ResponseTooLarge => ErrorKind::User,
|
||||
SqlOverHttpError::InvalidIsolationLevel => ErrorKind::User,
|
||||
SqlOverHttpError::Postgres(p) => p.get_error_kind(),
|
||||
SqlOverHttpError::JsonConversion(_) => ErrorKind::Postgres,
|
||||
SqlOverHttpError::Cancelled(c) => c.get_error_kind(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UserFacingError for SqlOverHttpError {
|
||||
fn to_string_client(&self) -> String {
|
||||
match self {
|
||||
SqlOverHttpError::ReadPayload(p) => p.to_string(),
|
||||
SqlOverHttpError::ConnectCompute(c) => c.to_string_client(),
|
||||
SqlOverHttpError::ConnInfo(c) => c.to_string_client(),
|
||||
SqlOverHttpError::RequestTooLarge => self.to_string(),
|
||||
SqlOverHttpError::ResponseTooLarge => self.to_string(),
|
||||
SqlOverHttpError::InvalidIsolationLevel => self.to_string(),
|
||||
SqlOverHttpError::Postgres(p) => p.to_string(),
|
||||
SqlOverHttpError::JsonConversion(_) => "could not parse postgres response".to_string(),
|
||||
SqlOverHttpError::Cancelled(_) => self.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ReadPayloadError {
|
||||
#[error("could not read the HTTP request body: {0}")]
|
||||
Read(#[from] hyper::Error),
|
||||
#[error("could not parse the HTTP request body: {0}")]
|
||||
Parse(#[from] serde_json::Error),
|
||||
}
|
||||
|
||||
impl ReportableError for ReadPayloadError {
|
||||
fn get_error_kind(&self) -> ErrorKind {
|
||||
match self {
|
||||
ReadPayloadError::Read(_) => ErrorKind::ClientDisconnect,
|
||||
ReadPayloadError::Parse(_) => ErrorKind::User,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum SqlOverHttpCancel {
|
||||
#[error("query was cancelled")]
|
||||
Postgres,
|
||||
#[error("query was cancelled while stuck trying to connect to the database")]
|
||||
Connect,
|
||||
}
|
||||
|
||||
impl ReportableError for SqlOverHttpCancel {
|
||||
fn get_error_kind(&self) -> ErrorKind {
|
||||
match self {
|
||||
SqlOverHttpCancel::Postgres => ErrorKind::RateLimit,
|
||||
SqlOverHttpCancel::Connect => ErrorKind::ServiceRateLimit,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_inner(
|
||||
cancel: CancellationToken,
|
||||
config: &'static ProxyConfig,
|
||||
ctx: &mut RequestMonitoring,
|
||||
request: Request<Body>,
|
||||
backend: Arc<PoolingBackend>,
|
||||
) -> anyhow::Result<Response<Body>> {
|
||||
) -> Result<Response<Body>, SqlOverHttpError> {
|
||||
let _request_gauge = NUM_CONNECTION_REQUESTS_GAUGE
|
||||
.with_label_values(&[ctx.protocol])
|
||||
.guard();
|
||||
@@ -345,7 +473,7 @@ async fn handle_inner(
|
||||
b"ReadUncommitted" => IsolationLevel::ReadUncommitted,
|
||||
b"ReadCommitted" => IsolationLevel::ReadCommitted,
|
||||
b"RepeatableRead" => IsolationLevel::RepeatableRead,
|
||||
_ => bail!("invalid isolation level"),
|
||||
_ => return Err(SqlOverHttpError::InvalidIsolationLevel),
|
||||
}),
|
||||
None => None,
|
||||
};
|
||||
@@ -363,19 +491,16 @@ async fn handle_inner(
|
||||
// we don't have a streaming request support yet so this is to prevent OOM
|
||||
// from a malicious user sending an extremely large request body
|
||||
if request_content_length > MAX_REQUEST_SIZE {
|
||||
return Err(anyhow::anyhow!(
|
||||
"request is too large (max is {MAX_REQUEST_SIZE} bytes)"
|
||||
));
|
||||
return Err(SqlOverHttpError::RequestTooLarge);
|
||||
}
|
||||
|
||||
let fetch_and_process_request = async {
|
||||
let body = hyper::body::to_bytes(request.into_body())
|
||||
.await
|
||||
.map_err(anyhow::Error::from)?;
|
||||
let body = hyper::body::to_bytes(request.into_body()).await?;
|
||||
info!(length = body.len(), "request payload read");
|
||||
let payload: Payload = serde_json::from_slice(&body)?;
|
||||
Ok::<Payload, anyhow::Error>(payload) // Adjust error type accordingly
|
||||
};
|
||||
Ok::<Payload, ReadPayloadError>(payload) // Adjust error type accordingly
|
||||
}
|
||||
.map_err(SqlOverHttpError::from);
|
||||
|
||||
let authenticate_and_connect = async {
|
||||
let keys = backend.authenticate(ctx, &conn_info).await?;
|
||||
@@ -385,11 +510,25 @@ async fn handle_inner(
|
||||
// not strictly necessary to mark success here,
|
||||
// but it's just insurance for if we forget it somewhere else
|
||||
ctx.latency_timer.success();
|
||||
Ok::<_, anyhow::Error>(client)
|
||||
};
|
||||
Ok::<_, HttpConnError>(client)
|
||||
}
|
||||
.map_err(SqlOverHttpError::from);
|
||||
|
||||
// Run both operations in parallel
|
||||
let (payload, mut client) = try_join!(fetch_and_process_request, authenticate_and_connect)?;
|
||||
let (payload, mut client) = match select(
|
||||
try_join(
|
||||
pin!(fetch_and_process_request),
|
||||
pin!(authenticate_and_connect),
|
||||
),
|
||||
pin!(cancel.cancelled()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Either::Left((result, _cancelled)) => result?,
|
||||
Either::Right((_cancelled, _)) => {
|
||||
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Connect))
|
||||
}
|
||||
};
|
||||
|
||||
let mut response = Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
@@ -401,19 +540,64 @@ async fn handle_inner(
|
||||
let mut size = 0;
|
||||
let result = match payload {
|
||||
Payload::Single(stmt) => {
|
||||
let (status, results) =
|
||||
query_to_json(&*client, stmt, &mut 0, raw_output, default_array_mode)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
client.discard();
|
||||
e
|
||||
})?;
|
||||
client.check_idle(status);
|
||||
results
|
||||
let mut size = 0;
|
||||
let (inner, mut discard) = client.inner();
|
||||
let cancel_token = inner.cancel_token();
|
||||
let query = pin!(query_to_json(
|
||||
&*inner,
|
||||
stmt,
|
||||
&mut size,
|
||||
raw_output,
|
||||
default_array_mode
|
||||
));
|
||||
let cancelled = pin!(cancel.cancelled());
|
||||
let res = select(query, cancelled).await;
|
||||
match res {
|
||||
Either::Left((Ok((status, results)), _cancelled)) => {
|
||||
discard.check_idle(status);
|
||||
results
|
||||
}
|
||||
Either::Left((Err(e), _cancelled)) => {
|
||||
discard.discard();
|
||||
return Err(e);
|
||||
}
|
||||
Either::Right((_cancelled, query)) => {
|
||||
if let Err(err) = cancel_token.cancel_query(NoTls).await {
|
||||
tracing::error!(?err, "could not cancel query");
|
||||
}
|
||||
match time::timeout(time::Duration::from_millis(100), query).await {
|
||||
Ok(Ok((status, results))) => {
|
||||
discard.check_idle(status);
|
||||
results
|
||||
}
|
||||
Ok(Err(error)) => {
|
||||
let db_error = match &error {
|
||||
SqlOverHttpError::ConnectCompute(
|
||||
HttpConnError::ConnectionError(e),
|
||||
)
|
||||
| SqlOverHttpError::Postgres(e) => e.as_db_error(),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
// if errored for some other reason, it might not be safe to return
|
||||
if !db_error.is_some_and(|e| *e.code() == SqlState::QUERY_CANCELED) {
|
||||
discard.discard();
|
||||
}
|
||||
|
||||
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres));
|
||||
}
|
||||
Err(_timeout) => {
|
||||
discard.discard();
|
||||
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Payload::Batch(statements) => {
|
||||
info!("starting transaction");
|
||||
let (inner, mut discard) = client.inner();
|
||||
let cancel_token = inner.cancel_token();
|
||||
let mut builder = inner.build_transaction();
|
||||
if let Some(isolation_level) = txn_isolation_level {
|
||||
builder = builder.isolation_level(isolation_level);
|
||||
@@ -433,6 +617,7 @@ async fn handle_inner(
|
||||
})?;
|
||||
|
||||
let results = match query_batch(
|
||||
cancel.child_token(),
|
||||
&transaction,
|
||||
statements,
|
||||
&mut size,
|
||||
@@ -452,6 +637,15 @@ async fn handle_inner(
|
||||
discard.check_idle(status);
|
||||
results
|
||||
}
|
||||
Err(SqlOverHttpError::Cancelled(_)) => {
|
||||
if let Err(err) = cancel_token.cancel_query(NoTls).await {
|
||||
tracing::error!(?err, "could not cancel query");
|
||||
}
|
||||
// TODO: after cancelling, wait to see if we can get a status. maybe the connection is still safe.
|
||||
discard.discard();
|
||||
|
||||
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres));
|
||||
}
|
||||
Err(err) => {
|
||||
info!("rollback");
|
||||
let status = transaction.rollback().await.map_err(|e| {
|
||||
@@ -466,16 +660,10 @@ async fn handle_inner(
|
||||
};
|
||||
|
||||
if txn_read_only {
|
||||
response = response.header(
|
||||
TXN_READ_ONLY.clone(),
|
||||
HeaderValue::try_from(txn_read_only.to_string())?,
|
||||
);
|
||||
response = response.header(TXN_READ_ONLY.clone(), &HEADER_VALUE_TRUE);
|
||||
}
|
||||
if txn_deferrable {
|
||||
response = response.header(
|
||||
TXN_DEFERRABLE.clone(),
|
||||
HeaderValue::try_from(txn_deferrable.to_string())?,
|
||||
);
|
||||
response = response.header(TXN_DEFERRABLE.clone(), &HEADER_VALUE_TRUE);
|
||||
}
|
||||
if let Some(txn_isolation_level) = txn_isolation_level_raw {
|
||||
response = response.header(TXN_ISOLATION_LEVEL.clone(), txn_isolation_level);
|
||||
@@ -503,19 +691,37 @@ async fn handle_inner(
|
||||
}
|
||||
|
||||
async fn query_batch(
|
||||
cancel: CancellationToken,
|
||||
transaction: &Transaction<'_>,
|
||||
queries: BatchQueryData,
|
||||
total_size: &mut usize,
|
||||
raw_output: bool,
|
||||
array_mode: bool,
|
||||
) -> anyhow::Result<Vec<Value>> {
|
||||
) -> Result<Vec<Value>, SqlOverHttpError> {
|
||||
let mut results = Vec::with_capacity(queries.queries.len());
|
||||
let mut current_size = 0;
|
||||
for stmt in queries.queries {
|
||||
// TODO: maybe we should check that the transaction bit is set here
|
||||
let (_, values) =
|
||||
query_to_json(transaction, stmt, &mut current_size, raw_output, array_mode).await?;
|
||||
results.push(values);
|
||||
let query = pin!(query_to_json(
|
||||
transaction,
|
||||
stmt,
|
||||
&mut current_size,
|
||||
raw_output,
|
||||
array_mode
|
||||
));
|
||||
let cancelled = pin!(cancel.cancelled());
|
||||
let res = select(query, cancelled).await;
|
||||
match res {
|
||||
// TODO: maybe we should check that the transaction bit is set here
|
||||
Either::Left((Ok((_, values)), _cancelled)) => {
|
||||
results.push(values);
|
||||
}
|
||||
Either::Left((Err(e), _cancelled)) => {
|
||||
return Err(e);
|
||||
}
|
||||
Either::Right((_cancelled, _)) => {
|
||||
return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres));
|
||||
}
|
||||
}
|
||||
}
|
||||
*total_size += current_size;
|
||||
Ok(results)
|
||||
@@ -527,7 +733,7 @@ async fn query_to_json<T: GenericClient>(
|
||||
current_size: &mut usize,
|
||||
raw_output: bool,
|
||||
default_array_mode: bool,
|
||||
) -> anyhow::Result<(ReadyForQueryStatus, Value)> {
|
||||
) -> Result<(ReadyForQueryStatus, Value), SqlOverHttpError> {
|
||||
info!("executing query");
|
||||
let query_params = data.params;
|
||||
let mut row_stream = std::pin::pin!(client.query_raw_txt(&data.query, query_params).await?);
|
||||
@@ -544,9 +750,7 @@ async fn query_to_json<T: GenericClient>(
|
||||
// we don't have a streaming response support yet so this is to prevent OOM
|
||||
// from a malicious query (eg a cross join)
|
||||
if *current_size > MAX_RESPONSE_SIZE {
|
||||
return Err(anyhow::anyhow!(
|
||||
"response is too large (max is {MAX_RESPONSE_SIZE} bytes)"
|
||||
));
|
||||
return Err(SqlOverHttpError::ResponseTooLarge);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,186 +1,110 @@
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use futures::{Future, Stream, StreamExt};
|
||||
use hyper::server::{accept::Accept, conn::AddrStream};
|
||||
use pin_project_lite::pin_project;
|
||||
use thiserror::Error;
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncWrite},
|
||||
task::JoinSet,
|
||||
time::timeout,
|
||||
};
|
||||
use tokio_rustls::{server::TlsStream, TlsAcceptor};
|
||||
use tracing::{info, warn};
|
||||
|
||||
/// Default timeout for the TLS handshake.
|
||||
pub const DEFAULT_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
use crate::{
|
||||
metrics::TLS_HANDSHAKE_FAILURES,
|
||||
protocol2::{WithClientIp, WithConnectionGuard},
|
||||
};
|
||||
|
||||
/// Trait for TLS implementation.
|
||||
///
|
||||
/// Implementations are provided by the rustls and native-tls features.
|
||||
pub trait AsyncTls<C: AsyncRead + AsyncWrite>: Clone {
|
||||
/// The type of the TLS stream created from the underlying stream.
|
||||
type Stream: Send + 'static;
|
||||
/// Error type for completing the TLS handshake
|
||||
type Error: std::error::Error + Send + 'static;
|
||||
/// Type of the Future for the TLS stream that is accepted.
|
||||
type AcceptFuture: Future<Output = Result<Self::Stream, Self::Error>> + Send + 'static;
|
||||
|
||||
/// Accept a TLS connection on an underlying stream
|
||||
fn accept(&self, stream: C) -> Self::AcceptFuture;
|
||||
pin_project! {
|
||||
/// Wraps a `Stream` of connections (such as a TCP listener) so that each connection is itself
|
||||
/// encrypted using TLS.
|
||||
pub(crate) struct TlsListener<A: Accept> {
|
||||
#[pin]
|
||||
listener: A,
|
||||
tls: TlsAcceptor,
|
||||
waiting: JoinSet<Option<TlsStream<A::Conn>>>,
|
||||
timeout: Duration,
|
||||
protocol: &'static str,
|
||||
}
|
||||
}
|
||||
|
||||
/// Asynchronously accept connections.
|
||||
pub trait AsyncAccept {
|
||||
/// The type of the connection that is accepted.
|
||||
type Connection: AsyncRead + AsyncWrite;
|
||||
/// The type of error that may be returned.
|
||||
type Error;
|
||||
|
||||
/// Poll to accept the next connection.
|
||||
fn poll_accept(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<Self::Connection, Self::Error>>>;
|
||||
|
||||
/// Return a new `AsyncAccept` that stops accepting connections after
|
||||
/// `ender` completes.
|
||||
///
|
||||
/// Useful for graceful shutdown.
|
||||
///
|
||||
/// See [examples/echo.rs](https://github.com/tmccombs/tls-listener/blob/main/examples/echo.rs)
|
||||
/// for example of how to use.
|
||||
fn until<F: Future>(self, ender: F) -> Until<Self, F>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
Until {
|
||||
acceptor: self,
|
||||
ender,
|
||||
impl<A: Accept> TlsListener<A> {
|
||||
/// Create a `TlsListener` with default options.
|
||||
pub(crate) fn new(
|
||||
tls: TlsAcceptor,
|
||||
listener: A,
|
||||
protocol: &'static str,
|
||||
timeout: Duration,
|
||||
) -> Self {
|
||||
TlsListener {
|
||||
listener,
|
||||
tls,
|
||||
waiting: JoinSet::new(),
|
||||
timeout,
|
||||
protocol,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
///
|
||||
/// Wraps a `Stream` of connections (such as a TCP listener) so that each connection is itself
|
||||
/// encrypted using TLS.
|
||||
///
|
||||
/// It is similar to:
|
||||
///
|
||||
/// ```ignore
|
||||
/// tcpListener.and_then(|s| tlsAcceptor.accept(s))
|
||||
/// ```
|
||||
///
|
||||
/// except that it has the ability to accept multiple transport-level connections
|
||||
/// simultaneously while the TLS handshake is pending for other connections.
|
||||
///
|
||||
/// By default, if a client fails the TLS handshake, that is treated as an error, and the
|
||||
/// `TlsListener` will return an `Err`. If the `TlsListener` is passed directly to a hyper
|
||||
/// [`Server`][1], then an invalid handshake can cause the server to stop accepting connections.
|
||||
/// See [`http-stream.rs`][2] or [`http-low-level`][3] examples, for examples of how to avoid this.
|
||||
///
|
||||
/// Note that if the maximum number of pending connections is greater than 1, the resulting
|
||||
/// [`T::Stream`][4] connections may come in a different order than the connections produced by the
|
||||
/// underlying listener.
|
||||
///
|
||||
/// [1]: https://docs.rs/hyper/latest/hyper/server/struct.Server.html
|
||||
/// [2]: https://github.com/tmccombs/tls-listener/blob/main/examples/http-stream.rs
|
||||
/// [3]: https://github.com/tmccombs/tls-listener/blob/main/examples/http-low-level.rs
|
||||
/// [4]: AsyncTls::Stream
|
||||
///
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub struct TlsListener<A: AsyncAccept, T: AsyncTls<A::Connection>> {
|
||||
#[pin]
|
||||
listener: A,
|
||||
tls: T,
|
||||
waiting: JoinSet<Result<Result<T::Stream, T::Error>, tokio::time::error::Elapsed>>,
|
||||
timeout: Duration,
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder for `TlsListener`.
|
||||
#[derive(Clone)]
|
||||
pub struct Builder<T> {
|
||||
tls: T,
|
||||
handshake_timeout: Duration,
|
||||
}
|
||||
|
||||
/// Wraps errors from either the listener or the TLS Acceptor
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error<LE: std::error::Error, TE: std::error::Error> {
|
||||
/// An error that arose from the listener ([AsyncAccept::Error])
|
||||
#[error("{0}")]
|
||||
ListenerError(#[source] LE),
|
||||
/// An error that occurred during the TLS accept handshake
|
||||
#[error("{0}")]
|
||||
TlsAcceptError(#[source] TE),
|
||||
}
|
||||
|
||||
impl<A: AsyncAccept, T> TlsListener<A, T>
|
||||
impl<A> Accept for TlsListener<A>
|
||||
where
|
||||
T: AsyncTls<A::Connection>,
|
||||
{
|
||||
/// Create a `TlsListener` with default options.
|
||||
pub fn new(tls: T, listener: A) -> Self {
|
||||
builder(tls).listen(listener)
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, T> TlsListener<A, T>
|
||||
where
|
||||
A: AsyncAccept,
|
||||
A: Accept<Conn = WithConnectionGuard<WithClientIp<AddrStream>>>,
|
||||
A::Error: std::error::Error,
|
||||
T: AsyncTls<A::Connection>,
|
||||
A::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
/// Accept the next connection
|
||||
///
|
||||
/// This is essentially an alias to `self.next()` with a more domain-appropriate name.
|
||||
pub async fn accept(&mut self) -> Option<<Self as Stream>::Item>
|
||||
where
|
||||
Self: Unpin,
|
||||
{
|
||||
self.next().await
|
||||
}
|
||||
type Conn = TlsStream<A::Conn>;
|
||||
|
||||
/// Replaces the Tls Acceptor configuration, which will be used for new connections.
|
||||
///
|
||||
/// This can be used to change the certificate used at runtime.
|
||||
pub fn replace_acceptor(&mut self, acceptor: T) {
|
||||
self.tls = acceptor;
|
||||
}
|
||||
type Error = Infallible;
|
||||
|
||||
/// Replaces the Tls Acceptor configuration from a pinned reference to `Self`.
|
||||
///
|
||||
/// This is useful if your listener is `!Unpin`.
|
||||
///
|
||||
/// This can be used to change the certificate used at runtime.
|
||||
pub fn replace_acceptor_pin(self: Pin<&mut Self>, acceptor: T) {
|
||||
*self.project().tls = acceptor;
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, T> Stream for TlsListener<A, T>
|
||||
where
|
||||
A: AsyncAccept,
|
||||
A::Error: std::error::Error,
|
||||
T: AsyncTls<A::Connection>,
|
||||
{
|
||||
type Item = Result<T::Stream, Error<A::Error, T::Error>>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
fn poll_accept(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
|
||||
let mut this = self.project();
|
||||
|
||||
loop {
|
||||
match this.listener.as_mut().poll_accept(cx) {
|
||||
Poll::Pending => break,
|
||||
Poll::Ready(Some(Ok(conn))) => {
|
||||
this.waiting
|
||||
.spawn(timeout(*this.timeout, this.tls.accept(conn)));
|
||||
Poll::Ready(Some(Ok(mut conn))) => {
|
||||
let t = *this.timeout;
|
||||
let tls = this.tls.clone();
|
||||
let protocol = *this.protocol;
|
||||
this.waiting.spawn(async move {
|
||||
let peer_addr = match conn.inner.wait_for_addr().await {
|
||||
Ok(Some(addr)) => addr,
|
||||
Err(e) => {
|
||||
tracing::error!("failed to accept TCP connection: invalid PROXY protocol V2 header: {e:#}");
|
||||
return None;
|
||||
}
|
||||
Ok(None) => conn.inner.inner.remote_addr()
|
||||
};
|
||||
|
||||
let accept = tls.accept(conn);
|
||||
match timeout(t, accept).await {
|
||||
Ok(Ok(conn)) => Some(conn),
|
||||
// The handshake failed, try getting another connection from the queue
|
||||
Ok(Err(e)) => {
|
||||
TLS_HANDSHAKE_FAILURES.inc();
|
||||
warn!(%peer_addr, protocol, "failed to accept TLS connection: {e:?}");
|
||||
None
|
||||
}
|
||||
// The handshake timed out, try getting another connection from the queue
|
||||
Err(_) => {
|
||||
TLS_HANDSHAKE_FAILURES.inc();
|
||||
warn!(%peer_addr, protocol, "failed to accept TLS connection: timeout");
|
||||
None
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
return Poll::Ready(Some(Err(Error::ListenerError(e))));
|
||||
tracing::error!("error accepting TCP connection: {e}");
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(None) => return Poll::Ready(None),
|
||||
}
|
||||
@@ -188,96 +112,19 @@ where
|
||||
|
||||
loop {
|
||||
return match this.waiting.poll_join_next(cx) {
|
||||
Poll::Ready(Some(Ok(Ok(conn)))) => {
|
||||
Poll::Ready(Some(conn.map_err(Error::TlsAcceptError)))
|
||||
Poll::Ready(Some(Ok(Some(conn)))) => {
|
||||
info!(protocol = this.protocol, "accepted new TLS connection");
|
||||
Poll::Ready(Some(Ok(conn)))
|
||||
}
|
||||
// The handshake timed out, try getting another connection from the queue
|
||||
Poll::Ready(Some(Ok(Err(_)))) => continue,
|
||||
// The handshake panicked
|
||||
Poll::Ready(Some(Err(e))) if e.is_panic() => {
|
||||
std::panic::resume_unwind(e.into_panic())
|
||||
// The handshake failed to complete, try getting another connection from the queue
|
||||
Poll::Ready(Some(Ok(None))) => continue,
|
||||
// The handshake panicked or was cancelled. ignore and get another connection
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
tracing::warn!("handshake aborted: {e}");
|
||||
continue;
|
||||
}
|
||||
// The handshake was externally aborted
|
||||
Poll::Ready(Some(Err(_))) => unreachable!("handshake tasks are never aborted"),
|
||||
_ => Poll::Pending,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: AsyncRead + AsyncWrite + Unpin + Send + 'static> AsyncTls<C> for tokio_rustls::TlsAcceptor {
|
||||
type Stream = tokio_rustls::server::TlsStream<C>;
|
||||
type Error = std::io::Error;
|
||||
type AcceptFuture = tokio_rustls::Accept<C>;
|
||||
|
||||
fn accept(&self, conn: C) -> Self::AcceptFuture {
|
||||
tokio_rustls::TlsAcceptor::accept(self, conn)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Builder<T> {
|
||||
/// Set the timeout for handshakes.
|
||||
///
|
||||
/// If a timeout takes longer than `timeout`, then the handshake will be
|
||||
/// aborted and the underlying connection will be dropped.
|
||||
///
|
||||
/// Defaults to `DEFAULT_HANDSHAKE_TIMEOUT`.
|
||||
pub fn handshake_timeout(&mut self, timeout: Duration) -> &mut Self {
|
||||
self.handshake_timeout = timeout;
|
||||
self
|
||||
}
|
||||
|
||||
/// Create a `TlsListener` from the builder
|
||||
///
|
||||
/// Actually build the `TlsListener`. The `listener` argument should be
|
||||
/// an implementation of the `AsyncAccept` trait that accepts new connections
|
||||
/// that the `TlsListener` will encrypt using TLS.
|
||||
pub fn listen<A: AsyncAccept>(&self, listener: A) -> TlsListener<A, T>
|
||||
where
|
||||
T: AsyncTls<A::Connection>,
|
||||
{
|
||||
TlsListener {
|
||||
listener,
|
||||
tls: self.tls.clone(),
|
||||
waiting: JoinSet::new(),
|
||||
timeout: self.handshake_timeout,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new Builder for a TlsListener
|
||||
///
|
||||
/// `server_config` will be used to configure the TLS sessions.
|
||||
pub fn builder<T>(tls: T) -> Builder<T> {
|
||||
Builder {
|
||||
tls,
|
||||
handshake_timeout: DEFAULT_HANDSHAKE_TIMEOUT,
|
||||
}
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
/// See [`AsyncAccept::until`]
|
||||
pub struct Until<A, E> {
|
||||
#[pin]
|
||||
acceptor: A,
|
||||
#[pin]
|
||||
ender: E,
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: AsyncAccept, E: Future> AsyncAccept for Until<A, E> {
|
||||
type Connection = A::Connection;
|
||||
type Error = A::Error;
|
||||
|
||||
fn poll_accept(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<Self::Connection, Self::Error>>> {
|
||||
let this = self.project();
|
||||
|
||||
match this.ender.poll(cx) {
|
||||
Poll::Pending => this.acceptor.poll_accept(cx),
|
||||
Poll::Ready(_) => Poll::Ready(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -140,6 +140,13 @@ pub static BROKER_ITERATION_TIMELINES: Lazy<Histogram> = Lazy::new(|| {
|
||||
)
|
||||
.expect("Failed to register safekeeper_broker_iteration_timelines histogram vec")
|
||||
});
|
||||
pub static RECEIVED_PS_FEEDBACKS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"safekeeper_received_ps_feedbacks_total",
|
||||
"Number of pageserver feedbacks received"
|
||||
)
|
||||
.expect("Failed to register safekeeper_received_ps_feedbacks_total counter")
|
||||
});
|
||||
|
||||
pub const LABEL_UNKNOWN: &str = "unknown";
|
||||
|
||||
@@ -301,7 +308,8 @@ pub async fn time_io_closure<E: Into<anyhow::Error>>(
|
||||
#[derive(Clone)]
|
||||
pub struct FullTimelineInfo {
|
||||
pub ttid: TenantTimelineId,
|
||||
pub ps_feedback: PageserverFeedback,
|
||||
pub ps_feedback_count: u64,
|
||||
pub last_ps_feedback: PageserverFeedback,
|
||||
pub wal_backup_active: bool,
|
||||
pub timeline_is_active: bool,
|
||||
pub num_computes: u32,
|
||||
@@ -327,6 +335,7 @@ pub struct TimelineCollector {
|
||||
remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
|
||||
ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
|
||||
feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
|
||||
ps_feedback_count: GenericGaugeVec<AtomicU64>,
|
||||
timeline_active: GenericGaugeVec<AtomicU64>,
|
||||
wal_backup_active: GenericGaugeVec<AtomicU64>,
|
||||
connected_computes: IntGaugeVec,
|
||||
@@ -430,6 +439,15 @@ impl TimelineCollector {
|
||||
.unwrap();
|
||||
descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
|
||||
|
||||
let ps_feedback_count = GenericGaugeVec::new(
|
||||
Opts::new(
|
||||
"safekeeper_ps_feedback_count_total",
|
||||
"Number of feedbacks received from the pageserver",
|
||||
),
|
||||
&["tenant_id", "timeline_id"],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let timeline_active = GenericGaugeVec::new(
|
||||
Opts::new(
|
||||
"safekeeper_timeline_active",
|
||||
@@ -538,6 +556,7 @@ impl TimelineCollector {
|
||||
remote_consistent_lsn,
|
||||
ps_last_received_lsn,
|
||||
feedback_last_time_seconds,
|
||||
ps_feedback_count,
|
||||
timeline_active,
|
||||
wal_backup_active,
|
||||
connected_computes,
|
||||
@@ -570,6 +589,7 @@ impl Collector for TimelineCollector {
|
||||
self.remote_consistent_lsn.reset();
|
||||
self.ps_last_received_lsn.reset();
|
||||
self.feedback_last_time_seconds.reset();
|
||||
self.ps_feedback_count.reset();
|
||||
self.timeline_active.reset();
|
||||
self.wal_backup_active.reset();
|
||||
self.connected_computes.reset();
|
||||
@@ -646,9 +666,12 @@ impl Collector for TimelineCollector {
|
||||
|
||||
self.ps_last_received_lsn
|
||||
.with_label_values(labels)
|
||||
.set(tli.ps_feedback.last_received_lsn.0);
|
||||
.set(tli.last_ps_feedback.last_received_lsn.0);
|
||||
self.ps_feedback_count
|
||||
.with_label_values(labels)
|
||||
.set(tli.ps_feedback_count);
|
||||
if let Ok(unix_time) = tli
|
||||
.ps_feedback
|
||||
.last_ps_feedback
|
||||
.replytime
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
{
|
||||
@@ -679,6 +702,7 @@ impl Collector for TimelineCollector {
|
||||
mfs.extend(self.remote_consistent_lsn.collect());
|
||||
mfs.extend(self.ps_last_received_lsn.collect());
|
||||
mfs.extend(self.feedback_last_time_seconds.collect());
|
||||
mfs.extend(self.ps_feedback_count.collect());
|
||||
mfs.extend(self.timeline_active.collect());
|
||||
mfs.extend(self.wal_backup_active.collect());
|
||||
mfs.extend(self.connected_computes.collect());
|
||||
|
||||
@@ -36,11 +36,15 @@ use tokio::time::Instant;
|
||||
use tracing::*;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
|
||||
const DEFAULT_FEEDBACK_CAPACITY: usize = 8;
|
||||
|
||||
/// Registry of WalReceivers (compute connections). Timeline holds it (wrapped
|
||||
/// in Arc).
|
||||
pub struct WalReceivers {
|
||||
mutex: Mutex<WalReceiversShared>,
|
||||
pageserver_feedback_tx: tokio::sync::broadcast::Sender<PageserverFeedback>,
|
||||
}
|
||||
|
||||
/// Id under which walreceiver is registered in shmem.
|
||||
@@ -48,8 +52,12 @@ type WalReceiverId = usize;
|
||||
|
||||
impl WalReceivers {
|
||||
pub fn new() -> Arc<WalReceivers> {
|
||||
let (pageserver_feedback_tx, _) =
|
||||
tokio::sync::broadcast::channel(DEFAULT_FEEDBACK_CAPACITY);
|
||||
|
||||
Arc::new(WalReceivers {
|
||||
mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
|
||||
pageserver_feedback_tx,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -116,6 +124,12 @@ impl WalReceivers {
|
||||
let mut shared = self.mutex.lock();
|
||||
shared.slots[id] = None;
|
||||
}
|
||||
|
||||
/// Broadcast pageserver feedback to connected walproposers.
|
||||
pub fn broadcast_pageserver_feedback(&self, feedback: PageserverFeedback) {
|
||||
// Err means there is no subscribers, it is fine.
|
||||
let _ = self.pageserver_feedback_tx.send(feedback);
|
||||
}
|
||||
}
|
||||
|
||||
/// Only a few connections are expected (normally one), so store in Vec.
|
||||
@@ -197,17 +211,28 @@ impl SafekeeperPostgresHandler {
|
||||
// sends, so this avoids deadlocks.
|
||||
let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
|
||||
let peer_addr = *pgb.get_peer_addr();
|
||||
let network_reader = NetworkReader {
|
||||
let mut network_reader = NetworkReader {
|
||||
ttid: self.ttid,
|
||||
conn_id: self.conn_id,
|
||||
pgb_reader: &mut pgb_reader,
|
||||
peer_addr,
|
||||
acceptor_handle: &mut acceptor_handle,
|
||||
};
|
||||
let res = tokio::select! {
|
||||
// todo: add read|write .context to these errors
|
||||
r = network_reader.run(msg_tx, msg_rx, reply_tx) => r,
|
||||
r = network_write(pgb, reply_rx) => r,
|
||||
|
||||
// Read first message and create timeline if needed.
|
||||
let res = network_reader.read_first_message().await;
|
||||
|
||||
let res = if let Ok((tli, next_msg)) = res {
|
||||
let pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback> =
|
||||
tli.get_walreceivers().pageserver_feedback_tx.subscribe();
|
||||
|
||||
tokio::select! {
|
||||
// todo: add read|write .context to these errors
|
||||
r = network_reader.run(msg_tx, msg_rx, reply_tx, tli.clone(), next_msg) => r,
|
||||
r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r,
|
||||
}
|
||||
} else {
|
||||
res.map(|_| ())
|
||||
};
|
||||
|
||||
// Join pg backend back.
|
||||
@@ -251,12 +276,9 @@ struct NetworkReader<'a, IO> {
|
||||
}
|
||||
|
||||
impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
|
||||
async fn run(
|
||||
self,
|
||||
msg_tx: Sender<ProposerAcceptorMessage>,
|
||||
msg_rx: Receiver<ProposerAcceptorMessage>,
|
||||
reply_tx: Sender<AcceptorProposerMessage>,
|
||||
) -> Result<(), CopyStreamHandlerEnd> {
|
||||
async fn read_first_message(
|
||||
&mut self,
|
||||
) -> Result<(Arc<Timeline>, ProposerAcceptorMessage), CopyStreamHandlerEnd> {
|
||||
// Receive information about server to create timeline, if not yet.
|
||||
let next_msg = read_message(self.pgb_reader).await?;
|
||||
let tli = match next_msg {
|
||||
@@ -278,9 +300,19 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
|
||||
)))
|
||||
}
|
||||
};
|
||||
Ok((tli, next_msg))
|
||||
}
|
||||
|
||||
async fn run(
|
||||
self,
|
||||
msg_tx: Sender<ProposerAcceptorMessage>,
|
||||
msg_rx: Receiver<ProposerAcceptorMessage>,
|
||||
reply_tx: Sender<AcceptorProposerMessage>,
|
||||
tli: Arc<Timeline>,
|
||||
next_msg: ProposerAcceptorMessage,
|
||||
) -> Result<(), CopyStreamHandlerEnd> {
|
||||
*self.acceptor_handle = Some(WalAcceptor::spawn(
|
||||
tli.clone(),
|
||||
tli,
|
||||
msg_rx,
|
||||
reply_tx,
|
||||
Some(self.conn_id),
|
||||
@@ -320,18 +352,46 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
pgb_writer: &mut PostgresBackend<IO>,
|
||||
mut reply_rx: Receiver<AcceptorProposerMessage>,
|
||||
mut pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback>,
|
||||
) -> Result<(), CopyStreamHandlerEnd> {
|
||||
let mut buf = BytesMut::with_capacity(128);
|
||||
|
||||
// storing append_response to inject PageserverFeedback into it
|
||||
let mut last_append_response = None;
|
||||
|
||||
loop {
|
||||
match reply_rx.recv().await {
|
||||
Some(msg) => {
|
||||
buf.clear();
|
||||
msg.serialize(&mut buf)?;
|
||||
pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
|
||||
// trying to read either AcceptorProposerMessage or PageserverFeedback
|
||||
let msg = tokio::select! {
|
||||
reply = reply_rx.recv() => {
|
||||
if let Some(msg) = reply {
|
||||
if let AcceptorProposerMessage::AppendResponse(append_response) = &msg {
|
||||
last_append_response = Some(append_response.clone());
|
||||
}
|
||||
Some(msg)
|
||||
} else {
|
||||
return Ok(()); // chan closed, WalAcceptor terminated
|
||||
}
|
||||
}
|
||||
None => return Ok(()), // chan closed, WalAcceptor terminated
|
||||
}
|
||||
|
||||
feedback = pageserver_feedback_rx.recv() =>
|
||||
match (feedback, &last_append_response) {
|
||||
(Ok(feedback), Some(append_response)) => {
|
||||
// clone AppendResponse and inject PageserverFeedback into it
|
||||
let mut append_response = append_response.clone();
|
||||
append_response.pageserver_feedback = Some(feedback);
|
||||
Some(AcceptorProposerMessage::AppendResponse(append_response))
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
};
|
||||
|
||||
let Some(msg) = msg else {
|
||||
continue;
|
||||
};
|
||||
|
||||
buf.clear();
|
||||
msg.serialize(&mut buf)?;
|
||||
pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -321,7 +321,7 @@ pub struct AppendRequestHeader {
|
||||
}
|
||||
|
||||
/// Report safekeeper state to proposer
|
||||
#[derive(Debug, Serialize)]
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct AppendResponse {
|
||||
// Current term of the safekeeper; if it is higher than proposer's, the
|
||||
// compute is out of date.
|
||||
@@ -334,7 +334,7 @@ pub struct AppendResponse {
|
||||
// a criterion for walproposer --sync mode exit
|
||||
pub commit_lsn: Lsn,
|
||||
pub hs_feedback: HotStandbyFeedback,
|
||||
pub pageserver_feedback: PageserverFeedback,
|
||||
pub pageserver_feedback: Option<PageserverFeedback>,
|
||||
}
|
||||
|
||||
impl AppendResponse {
|
||||
@@ -344,7 +344,7 @@ impl AppendResponse {
|
||||
flush_lsn: Lsn(0),
|
||||
commit_lsn: Lsn(0),
|
||||
hs_feedback: HotStandbyFeedback::empty(),
|
||||
pageserver_feedback: PageserverFeedback::empty(),
|
||||
pageserver_feedback: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -462,7 +462,11 @@ impl AcceptorProposerMessage {
|
||||
buf.put_u64_le(msg.hs_feedback.xmin);
|
||||
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
|
||||
|
||||
msg.pageserver_feedback.serialize(buf);
|
||||
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
|
||||
// if it is not present.
|
||||
if let Some(ref msg) = msg.pageserver_feedback {
|
||||
msg.serialize(buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -681,7 +685,7 @@ where
|
||||
commit_lsn: self.state.commit_lsn,
|
||||
// will be filled by the upper code to avoid bothering safekeeper
|
||||
hs_feedback: HotStandbyFeedback::empty(),
|
||||
pageserver_feedback: PageserverFeedback::empty(),
|
||||
pageserver_feedback: None,
|
||||
};
|
||||
trace!("formed AppendResponse {:?}", ar);
|
||||
ar
|
||||
|
||||
@@ -2,6 +2,8 @@
|
||||
//! with the "START_REPLICATION" message, and registry of walsenders.
|
||||
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::metrics::RECEIVED_PS_FEEDBACKS;
|
||||
use crate::receive_wal::WalReceivers;
|
||||
use crate::safekeeper::{Term, TermLsn};
|
||||
use crate::timeline::Timeline;
|
||||
use crate::wal_service::ConnectionId;
|
||||
@@ -21,7 +23,7 @@ use utils::failpoint_support;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
|
||||
use std::cmp::{max, min};
|
||||
use std::cmp::min;
|
||||
use std::net::SocketAddr;
|
||||
use std::str;
|
||||
use std::sync::Arc;
|
||||
@@ -90,12 +92,14 @@ pub struct StandbyFeedback {
|
||||
/// WalSenders registry. Timeline holds it (wrapped in Arc).
|
||||
pub struct WalSenders {
|
||||
mutex: Mutex<WalSendersShared>,
|
||||
walreceivers: Arc<WalReceivers>,
|
||||
}
|
||||
|
||||
impl WalSenders {
|
||||
pub fn new() -> Arc<WalSenders> {
|
||||
pub fn new(walreceivers: Arc<WalReceivers>) -> Arc<WalSenders> {
|
||||
Arc::new(WalSenders {
|
||||
mutex: Mutex::new(WalSendersShared::new()),
|
||||
walreceivers,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -151,22 +155,29 @@ impl WalSenders {
|
||||
.min()
|
||||
}
|
||||
|
||||
/// Get aggregated pageserver feedback.
|
||||
pub fn get_ps_feedback(self: &Arc<WalSenders>) -> PageserverFeedback {
|
||||
self.mutex.lock().agg_ps_feedback
|
||||
/// Returns total counter of pageserver feedbacks received and last feedback.
|
||||
pub fn get_ps_feedback_stats(self: &Arc<WalSenders>) -> (u64, PageserverFeedback) {
|
||||
let shared = self.mutex.lock();
|
||||
(shared.ps_feedback_counter, shared.last_ps_feedback)
|
||||
}
|
||||
|
||||
/// Get aggregated pageserver and hot standby feedback (we send them to compute).
|
||||
pub fn get_feedbacks(self: &Arc<WalSenders>) -> (PageserverFeedback, HotStandbyFeedback) {
|
||||
let shared = self.mutex.lock();
|
||||
(shared.agg_ps_feedback, shared.agg_hs_feedback)
|
||||
/// Get aggregated hot standby feedback (we send it to compute).
|
||||
pub fn get_hotstandby(self: &Arc<WalSenders>) -> HotStandbyFeedback {
|
||||
self.mutex.lock().agg_hs_feedback
|
||||
}
|
||||
|
||||
/// Record new pageserver feedback, update aggregated values.
|
||||
fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
|
||||
let mut shared = self.mutex.lock();
|
||||
shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
|
||||
shared.update_ps_feedback();
|
||||
shared.last_ps_feedback = *feedback;
|
||||
shared.ps_feedback_counter += 1;
|
||||
drop(shared);
|
||||
|
||||
RECEIVED_PS_FEEDBACKS.inc();
|
||||
|
||||
// send feedback to connected walproposers
|
||||
self.walreceivers.broadcast_pageserver_feedback(*feedback);
|
||||
}
|
||||
|
||||
/// Record standby reply.
|
||||
@@ -222,8 +233,10 @@ impl WalSenders {
|
||||
struct WalSendersShared {
|
||||
// aggregated over all walsenders value
|
||||
agg_hs_feedback: HotStandbyFeedback,
|
||||
// aggregated over all walsenders value
|
||||
agg_ps_feedback: PageserverFeedback,
|
||||
// last feedback ever received from any pageserver, empty if none
|
||||
last_ps_feedback: PageserverFeedback,
|
||||
// total counter of pageserver feedbacks received
|
||||
ps_feedback_counter: u64,
|
||||
slots: Vec<Option<WalSenderState>>,
|
||||
}
|
||||
|
||||
@@ -231,7 +244,8 @@ impl WalSendersShared {
|
||||
fn new() -> Self {
|
||||
WalSendersShared {
|
||||
agg_hs_feedback: HotStandbyFeedback::empty(),
|
||||
agg_ps_feedback: PageserverFeedback::empty(),
|
||||
last_ps_feedback: PageserverFeedback::empty(),
|
||||
ps_feedback_counter: 0,
|
||||
slots: Vec::new(),
|
||||
}
|
||||
}
|
||||
@@ -276,37 +290,6 @@ impl WalSendersShared {
|
||||
}
|
||||
self.agg_hs_feedback = agg;
|
||||
}
|
||||
|
||||
/// Update aggregated pageserver feedback. LSNs (last_received,
|
||||
/// disk_consistent, remote_consistent) and reply timestamp are just
|
||||
/// maximized; timeline_size if taken from feedback with highest
|
||||
/// last_received lsn. This is generally reasonable, but we might want to
|
||||
/// implement other policies once multiple pageservers start to be actively
|
||||
/// used.
|
||||
fn update_ps_feedback(&mut self) {
|
||||
let init = PageserverFeedback::empty();
|
||||
let acc =
|
||||
self.slots
|
||||
.iter()
|
||||
.flatten()
|
||||
.fold(init, |mut acc, ws_state| match ws_state.feedback {
|
||||
ReplicationFeedback::Pageserver(feedback) => {
|
||||
if feedback.last_received_lsn > acc.last_received_lsn {
|
||||
acc.current_timeline_size = feedback.current_timeline_size;
|
||||
}
|
||||
acc.last_received_lsn =
|
||||
max(feedback.last_received_lsn, acc.last_received_lsn);
|
||||
acc.disk_consistent_lsn =
|
||||
max(feedback.disk_consistent_lsn, acc.disk_consistent_lsn);
|
||||
acc.remote_consistent_lsn =
|
||||
max(feedback.remote_consistent_lsn, acc.remote_consistent_lsn);
|
||||
acc.replytime = max(feedback.replytime, acc.replytime);
|
||||
acc
|
||||
}
|
||||
ReplicationFeedback::Standby(_) => acc,
|
||||
});
|
||||
self.agg_ps_feedback = acc;
|
||||
}
|
||||
}
|
||||
|
||||
// Serialized is used only for pretty printing in json.
|
||||
@@ -443,7 +426,7 @@ impl SafekeeperPostgresHandler {
|
||||
};
|
||||
let mut reply_reader = ReplyReader {
|
||||
reader,
|
||||
ws_guard,
|
||||
ws_guard: ws_guard.clone(),
|
||||
tli,
|
||||
};
|
||||
|
||||
@@ -452,6 +435,18 @@ impl SafekeeperPostgresHandler {
|
||||
r = sender.run() => r,
|
||||
r = reply_reader.run() => r,
|
||||
};
|
||||
|
||||
let ws_state = ws_guard
|
||||
.walsenders
|
||||
.mutex
|
||||
.lock()
|
||||
.get_slot(ws_guard.id)
|
||||
.clone();
|
||||
info!(
|
||||
"finished streaming to {}, feedback={:?}",
|
||||
ws_state.addr, ws_state.feedback,
|
||||
);
|
||||
|
||||
// Join pg backend back.
|
||||
pgb.unsplit(reply_reader.reader)?;
|
||||
|
||||
@@ -733,7 +728,6 @@ async fn wait_for_lsn(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use postgres_protocol::PG_EPOCH;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::*;
|
||||
@@ -792,27 +786,4 @@ mod tests {
|
||||
wss.update_hs_feedback();
|
||||
assert_eq!(wss.agg_hs_feedback.xmin, 42);
|
||||
}
|
||||
|
||||
// form pageserver feedback with given last_record_lsn / tli size and the
|
||||
// rest set to dummy values.
|
||||
fn ps_feedback(current_timeline_size: u64, last_received_lsn: Lsn) -> ReplicationFeedback {
|
||||
ReplicationFeedback::Pageserver(PageserverFeedback {
|
||||
current_timeline_size,
|
||||
last_received_lsn,
|
||||
disk_consistent_lsn: Lsn::INVALID,
|
||||
remote_consistent_lsn: Lsn::INVALID,
|
||||
replytime: *PG_EPOCH,
|
||||
})
|
||||
}
|
||||
|
||||
// test that ps aggregation works as expected
|
||||
#[test]
|
||||
fn test_ps_feedback() {
|
||||
let mut wss = WalSendersShared::new();
|
||||
push_feedback(&mut wss, ps_feedback(8, Lsn(42)));
|
||||
push_feedback(&mut wss, ps_feedback(4, Lsn(84)));
|
||||
wss.update_ps_feedback();
|
||||
assert_eq!(wss.agg_ps_feedback.current_timeline_size, 4);
|
||||
assert_eq!(wss.agg_ps_feedback.last_received_lsn, Lsn(84));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -402,6 +402,7 @@ impl Timeline {
|
||||
)));
|
||||
let (cancellation_tx, cancellation_rx) = watch::channel(false);
|
||||
|
||||
let walreceivers = WalReceivers::new();
|
||||
Ok(Timeline {
|
||||
ttid,
|
||||
wal_backup_launcher_tx,
|
||||
@@ -410,8 +411,8 @@ impl Timeline {
|
||||
term_flush_lsn_watch_tx,
|
||||
term_flush_lsn_watch_rx,
|
||||
mutex: Mutex::new(shared_state),
|
||||
walsenders: WalSenders::new(),
|
||||
walreceivers: WalReceivers::new(),
|
||||
walsenders: WalSenders::new(walreceivers.clone()),
|
||||
walreceivers,
|
||||
cancellation_rx,
|
||||
cancellation_tx,
|
||||
timeline_dir: conf.timeline_dir(&ttid),
|
||||
@@ -435,6 +436,7 @@ impl Timeline {
|
||||
let state =
|
||||
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
|
||||
|
||||
let walreceivers = WalReceivers::new();
|
||||
Ok(Timeline {
|
||||
ttid,
|
||||
wal_backup_launcher_tx,
|
||||
@@ -443,8 +445,8 @@ impl Timeline {
|
||||
term_flush_lsn_watch_tx,
|
||||
term_flush_lsn_watch_rx,
|
||||
mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
|
||||
walsenders: WalSenders::new(),
|
||||
walreceivers: WalReceivers::new(),
|
||||
walsenders: WalSenders::new(walreceivers.clone()),
|
||||
walreceivers,
|
||||
cancellation_rx,
|
||||
cancellation_tx,
|
||||
timeline_dir: conf.timeline_dir(&ttid),
|
||||
@@ -656,12 +658,9 @@ impl Timeline {
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
rmsg = shared_state.sk.process_msg(msg).await?;
|
||||
|
||||
// if this is AppendResponse, fill in proper pageserver and hot
|
||||
// standby feedback.
|
||||
// if this is AppendResponse, fill in proper hot standby feedback.
|
||||
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
|
||||
let (ps_feedback, hs_feedback) = self.walsenders.get_feedbacks();
|
||||
resp.hs_feedback = hs_feedback;
|
||||
resp.pageserver_feedback = ps_feedback;
|
||||
resp.hs_feedback = self.walsenders.get_hotstandby();
|
||||
}
|
||||
|
||||
commit_lsn = shared_state.sk.state.inmem.commit_lsn;
|
||||
@@ -898,12 +897,13 @@ impl Timeline {
|
||||
return None;
|
||||
}
|
||||
|
||||
let ps_feedback = self.walsenders.get_ps_feedback();
|
||||
let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
|
||||
let state = self.write_shared_state().await;
|
||||
if state.active {
|
||||
Some(FullTimelineInfo {
|
||||
ttid: self.ttid,
|
||||
ps_feedback,
|
||||
ps_feedback_count,
|
||||
last_ps_feedback,
|
||||
wal_backup_active: state.wal_backup_active,
|
||||
timeline_is_active: state.active,
|
||||
num_computes: self.walreceivers.get_num() as u32,
|
||||
|
||||
@@ -519,9 +519,9 @@ class NeonEnvBuilder:
|
||||
self.env = NeonEnv(self)
|
||||
return self.env
|
||||
|
||||
def start(self):
|
||||
def start(self, register_pageservers=False):
|
||||
assert self.env is not None, "environment is not already initialized, call init() first"
|
||||
self.env.start()
|
||||
self.env.start(register_pageservers=register_pageservers)
|
||||
|
||||
def init_start(
|
||||
self,
|
||||
@@ -1014,24 +1014,24 @@ class NeonEnv:
|
||||
self.initial_tenant = config.initial_tenant
|
||||
self.initial_timeline = config.initial_timeline
|
||||
|
||||
# Find two adjacent ports for attachment service and its postgres DB. This
|
||||
# Find two adjacent ports for storage controller and its postgres DB. This
|
||||
# loop would eventually throw from get_port() if we run out of ports (extremely
|
||||
# unlikely): usually we find two adjacent free ports on the first iteration.
|
||||
while True:
|
||||
self.attachment_service_port = self.port_distributor.get_port()
|
||||
attachment_service_pg_port = self.port_distributor.get_port()
|
||||
if attachment_service_pg_port == self.attachment_service_port + 1:
|
||||
self.storage_controller_port = self.port_distributor.get_port()
|
||||
storage_controller_pg_port = self.port_distributor.get_port()
|
||||
if storage_controller_pg_port == self.storage_controller_port + 1:
|
||||
break
|
||||
|
||||
# The URL for the pageserver to use as its control_plane_api config
|
||||
self.control_plane_api: str = f"http://127.0.0.1:{self.attachment_service_port}/upcall/v1"
|
||||
# The base URL of the attachment service
|
||||
self.attachment_service_api: str = f"http://127.0.0.1:{self.attachment_service_port}"
|
||||
self.control_plane_api: str = f"http://127.0.0.1:{self.storage_controller_port}/upcall/v1"
|
||||
# The base URL of the storage controller
|
||||
self.storage_controller_api: str = f"http://127.0.0.1:{self.storage_controller_port}"
|
||||
|
||||
# For testing this with a fake HTTP server, enable passing through a URL from config
|
||||
self.control_plane_compute_hook_api = config.control_plane_compute_hook_api
|
||||
|
||||
self.attachment_service: NeonAttachmentService = NeonAttachmentService(
|
||||
self.storage_controller: NeonStorageController = NeonStorageController(
|
||||
self, config.auth_enabled
|
||||
)
|
||||
|
||||
@@ -1112,17 +1112,22 @@ class NeonEnv:
|
||||
log.info(f"Config: {cfg}")
|
||||
self.neon_cli.init(cfg, force=config.config_init_force)
|
||||
|
||||
def start(self):
|
||||
# Attachment service starts first, so that pageserver /re-attach calls don't
|
||||
def start(self, register_pageservers=False):
|
||||
# storage controller starts first, so that pageserver /re-attach calls don't
|
||||
# bounce through retries on startup
|
||||
self.attachment_service.start()
|
||||
self.storage_controller.start()
|
||||
|
||||
def attachment_service_ready():
|
||||
assert self.attachment_service.ready() is True
|
||||
def storage_controller_ready():
|
||||
assert self.storage_controller.ready() is True
|
||||
|
||||
# Wait for attachment service readiness to prevent unnecessary post start-up
|
||||
# Wait for storage controller readiness to prevent unnecessary post start-up
|
||||
# reconcile.
|
||||
wait_until(30, 1, attachment_service_ready)
|
||||
wait_until(30, 1, storage_controller_ready)
|
||||
|
||||
if register_pageservers:
|
||||
# Special case for forward compat tests, this can be removed later.
|
||||
for pageserver in self.pageservers:
|
||||
self.storage_controller.node_register(pageserver)
|
||||
|
||||
# Start up broker, pageserver and all safekeepers
|
||||
futs = []
|
||||
@@ -1153,7 +1158,7 @@ class NeonEnv:
|
||||
if ps_assert_metric_no_errors:
|
||||
pageserver.assert_no_metric_errors()
|
||||
pageserver.stop(immediate=immediate)
|
||||
self.attachment_service.stop(immediate=immediate)
|
||||
self.storage_controller.stop(immediate=immediate)
|
||||
self.broker.stop(immediate=immediate)
|
||||
|
||||
@property
|
||||
@@ -1188,9 +1193,9 @@ class NeonEnv:
|
||||
def get_tenant_pageserver(self, tenant_id: Union[TenantId, TenantShardId]):
|
||||
"""
|
||||
Get the NeonPageserver where this tenant shard is currently attached, according
|
||||
to the attachment service.
|
||||
to the storage controller.
|
||||
"""
|
||||
meta = self.attachment_service.inspect(tenant_id)
|
||||
meta = self.storage_controller.inspect(tenant_id)
|
||||
if meta is None:
|
||||
return None
|
||||
pageserver_id = meta[1]
|
||||
@@ -1697,12 +1702,12 @@ class NeonCli(AbstractNeonCli):
|
||||
res.check_returncode()
|
||||
return res
|
||||
|
||||
def attachment_service_start(self):
|
||||
cmd = ["attachment_service", "start"]
|
||||
def storage_controller_start(self):
|
||||
cmd = ["storage_controller", "start"]
|
||||
return self.raw_cli(cmd)
|
||||
|
||||
def attachment_service_stop(self, immediate: bool):
|
||||
cmd = ["attachment_service", "stop"]
|
||||
def storage_controller_stop(self, immediate: bool):
|
||||
cmd = ["storage_controller", "stop"]
|
||||
if immediate:
|
||||
cmd.extend(["-m", "immediate"])
|
||||
return self.raw_cli(cmd)
|
||||
@@ -1712,10 +1717,8 @@ class NeonCli(AbstractNeonCli):
|
||||
id: int,
|
||||
overrides: Tuple[str, ...] = (),
|
||||
extra_env_vars: Optional[Dict[str, str]] = None,
|
||||
register: bool = True,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
register_str = "true" if register else "false"
|
||||
start_args = ["pageserver", "start", f"--id={id}", *overrides, f"--register={register_str}"]
|
||||
start_args = ["pageserver", "start", f"--id={id}", *overrides]
|
||||
storage = self.env.pageserver_remote_storage
|
||||
append_pageserver_param_overrides(
|
||||
params_to_update=start_args,
|
||||
@@ -1942,14 +1945,14 @@ class Pagectl(AbstractNeonCli):
|
||||
return IndexPartDump.from_json(parsed)
|
||||
|
||||
|
||||
class AttachmentServiceApiException(Exception):
|
||||
class StorageControllerApiException(Exception):
|
||||
def __init__(self, message, status_code: int):
|
||||
super().__init__(message)
|
||||
self.message = message
|
||||
self.status_code = status_code
|
||||
|
||||
|
||||
class NeonAttachmentService(MetricsGetter):
|
||||
class NeonStorageController(MetricsGetter):
|
||||
def __init__(self, env: NeonEnv, auth_enabled: bool):
|
||||
self.env = env
|
||||
self.running = False
|
||||
@@ -1957,13 +1960,13 @@ class NeonAttachmentService(MetricsGetter):
|
||||
|
||||
def start(self):
|
||||
assert not self.running
|
||||
self.env.neon_cli.attachment_service_start()
|
||||
self.env.neon_cli.storage_controller_start()
|
||||
self.running = True
|
||||
return self
|
||||
|
||||
def stop(self, immediate: bool = False) -> "NeonAttachmentService":
|
||||
def stop(self, immediate: bool = False) -> "NeonStorageController":
|
||||
if self.running:
|
||||
self.env.neon_cli.attachment_service_stop(immediate)
|
||||
self.env.neon_cli.storage_controller_stop(immediate)
|
||||
self.running = False
|
||||
return self
|
||||
|
||||
@@ -1976,22 +1979,22 @@ class NeonAttachmentService(MetricsGetter):
|
||||
msg = res.json()["msg"]
|
||||
except: # noqa: E722
|
||||
msg = ""
|
||||
raise AttachmentServiceApiException(msg, res.status_code) from e
|
||||
raise StorageControllerApiException(msg, res.status_code) from e
|
||||
|
||||
def pageserver_api(self) -> PageserverHttpClient:
|
||||
"""
|
||||
The attachment service implements a subset of the pageserver REST API, for mapping
|
||||
The storage controller implements a subset of the pageserver REST API, for mapping
|
||||
per-tenant actions into per-shard actions (e.g. timeline creation). Tests should invoke those
|
||||
functions via the HttpClient, as an implicit check that these APIs remain compatible.
|
||||
"""
|
||||
auth_token = None
|
||||
if self.auth_enabled:
|
||||
auth_token = self.env.auth_keys.generate_token(scope=TokenScope.PAGE_SERVER_API)
|
||||
return PageserverHttpClient(self.env.attachment_service_port, lambda: True, auth_token)
|
||||
return PageserverHttpClient(self.env.storage_controller_port, lambda: True, auth_token)
|
||||
|
||||
def request(self, method, *args, **kwargs) -> requests.Response:
|
||||
resp = requests.request(method, *args, **kwargs)
|
||||
NeonAttachmentService.raise_api_exception(resp)
|
||||
NeonStorageController.raise_api_exception(resp)
|
||||
|
||||
return resp
|
||||
|
||||
@@ -2004,15 +2007,15 @@ class NeonAttachmentService(MetricsGetter):
|
||||
return headers
|
||||
|
||||
def get_metrics(self) -> Metrics:
|
||||
res = self.request("GET", f"{self.env.attachment_service_api}/metrics")
|
||||
res = self.request("GET", f"{self.env.storage_controller_api}/metrics")
|
||||
return parse_metrics(res.text)
|
||||
|
||||
def ready(self) -> bool:
|
||||
status = None
|
||||
try:
|
||||
resp = self.request("GET", f"{self.env.attachment_service_api}/ready")
|
||||
resp = self.request("GET", f"{self.env.storage_controller_api}/ready")
|
||||
status = resp.status_code
|
||||
except AttachmentServiceApiException as e:
|
||||
except StorageControllerApiException as e:
|
||||
status = e.status_code
|
||||
|
||||
if status == 503:
|
||||
@@ -2027,7 +2030,7 @@ class NeonAttachmentService(MetricsGetter):
|
||||
) -> int:
|
||||
response = self.request(
|
||||
"POST",
|
||||
f"{self.env.attachment_service_api}/debug/v1/attach-hook",
|
||||
f"{self.env.storage_controller_api}/debug/v1/attach-hook",
|
||||
json={"tenant_shard_id": str(tenant_shard_id), "node_id": pageserver_id},
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
@@ -2038,7 +2041,7 @@ class NeonAttachmentService(MetricsGetter):
|
||||
def attach_hook_drop(self, tenant_shard_id: Union[TenantId, TenantShardId]):
|
||||
self.request(
|
||||
"POST",
|
||||
f"{self.env.attachment_service_api}/debug/v1/attach-hook",
|
||||
f"{self.env.storage_controller_api}/debug/v1/attach-hook",
|
||||
json={"tenant_shard_id": str(tenant_shard_id), "node_id": None},
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
@@ -2049,7 +2052,7 @@ class NeonAttachmentService(MetricsGetter):
|
||||
"""
|
||||
response = self.request(
|
||||
"POST",
|
||||
f"{self.env.attachment_service_api}/debug/v1/inspect",
|
||||
f"{self.env.storage_controller_api}/debug/v1/inspect",
|
||||
json={"tenant_shard_id": str(tenant_shard_id)},
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
@@ -2066,11 +2069,13 @@ class NeonAttachmentService(MetricsGetter):
|
||||
"node_id": int(node.id),
|
||||
"listen_http_addr": "localhost",
|
||||
"listen_http_port": node.service_port.http,
|
||||
"listen_pg_addr": "localhost",
|
||||
"listen_pg_port": node.service_port.pg,
|
||||
}
|
||||
log.info(f"node_register({body})")
|
||||
self.request(
|
||||
"POST",
|
||||
f"{self.env.attachment_service_api}/control/v1/node",
|
||||
f"{self.env.storage_controller_api}/control/v1/node",
|
||||
json=body,
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
@@ -2078,7 +2083,7 @@ class NeonAttachmentService(MetricsGetter):
|
||||
def node_list(self):
|
||||
response = self.request(
|
||||
"GET",
|
||||
f"{self.env.attachment_service_api}/control/v1/node",
|
||||
f"{self.env.storage_controller_api}/control/v1/node",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
return response.json()
|
||||
@@ -2088,7 +2093,7 @@ class NeonAttachmentService(MetricsGetter):
|
||||
body["node_id"] = node_id
|
||||
self.request(
|
||||
"PUT",
|
||||
f"{self.env.attachment_service_api}/control/v1/node/{node_id}/config",
|
||||
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/config",
|
||||
json=body,
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
@@ -2118,7 +2123,7 @@ class NeonAttachmentService(MetricsGetter):
|
||||
|
||||
response = self.request(
|
||||
"POST",
|
||||
f"{self.env.attachment_service_api}/v1/tenant",
|
||||
f"{self.env.storage_controller_api}/v1/tenant",
|
||||
json=body,
|
||||
headers=self.headers(TokenScope.PAGE_SERVER_API),
|
||||
)
|
||||
@@ -2130,18 +2135,20 @@ class NeonAttachmentService(MetricsGetter):
|
||||
"""
|
||||
response = self.request(
|
||||
"GET",
|
||||
f"{self.env.attachment_service_api}/control/v1/tenant/{tenant_id}/locate",
|
||||
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/locate",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
body = response.json()
|
||||
shards: list[dict[str, Any]] = body["shards"]
|
||||
return shards
|
||||
|
||||
def tenant_shard_split(self, tenant_id: TenantId, shard_count: int) -> list[TenantShardId]:
|
||||
def tenant_shard_split(
|
||||
self, tenant_id: TenantId, shard_count: int, shard_stripe_size: Optional[int] = None
|
||||
) -> list[TenantShardId]:
|
||||
response = self.request(
|
||||
"PUT",
|
||||
f"{self.env.attachment_service_api}/control/v1/tenant/{tenant_id}/shard_split",
|
||||
json={"new_shard_count": shard_count},
|
||||
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/shard_split",
|
||||
json={"new_shard_count": shard_count, "new_stripe_size": shard_stripe_size},
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
body = response.json()
|
||||
@@ -2152,7 +2159,7 @@ class NeonAttachmentService(MetricsGetter):
|
||||
def tenant_shard_migrate(self, tenant_shard_id: TenantShardId, dest_ps_id: int):
|
||||
self.request(
|
||||
"PUT",
|
||||
f"{self.env.attachment_service_api}/control/v1/tenant/{tenant_shard_id}/migrate",
|
||||
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_shard_id}/migrate",
|
||||
json={"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id},
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
@@ -2165,12 +2172,12 @@ class NeonAttachmentService(MetricsGetter):
|
||||
"""
|
||||
self.request(
|
||||
"POST",
|
||||
f"{self.env.attachment_service_api}/debug/v1/consistency_check",
|
||||
f"{self.env.storage_controller_api}/debug/v1/consistency_check",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
log.info("Attachment service passed consistency check")
|
||||
log.info("storage controller passed consistency check")
|
||||
|
||||
def __enter__(self) -> "NeonAttachmentService":
|
||||
def __enter__(self) -> "NeonStorageController":
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
@@ -2233,7 +2240,6 @@ class NeonPageserver(PgProtocol):
|
||||
self,
|
||||
overrides: Tuple[str, ...] = (),
|
||||
extra_env_vars: Optional[Dict[str, str]] = None,
|
||||
register: bool = True,
|
||||
) -> "NeonPageserver":
|
||||
"""
|
||||
Start the page server.
|
||||
@@ -2243,7 +2249,7 @@ class NeonPageserver(PgProtocol):
|
||||
assert self.running is False
|
||||
|
||||
self.env.neon_cli.pageserver_start(
|
||||
self.id, overrides=overrides, extra_env_vars=extra_env_vars, register=register
|
||||
self.id, overrides=overrides, extra_env_vars=extra_env_vars
|
||||
)
|
||||
self.running = True
|
||||
return self
|
||||
@@ -2401,7 +2407,7 @@ class NeonPageserver(PgProtocol):
|
||||
"""
|
||||
client = self.http_client()
|
||||
if generation is None:
|
||||
generation = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
generation = self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
|
||||
return client.tenant_attach(
|
||||
tenant_id,
|
||||
config,
|
||||
@@ -2410,14 +2416,14 @@ class NeonPageserver(PgProtocol):
|
||||
)
|
||||
|
||||
def tenant_detach(self, tenant_id: TenantId):
|
||||
self.env.attachment_service.attach_hook_drop(tenant_id)
|
||||
self.env.storage_controller.attach_hook_drop(tenant_id)
|
||||
|
||||
client = self.http_client()
|
||||
return client.tenant_detach(tenant_id)
|
||||
|
||||
def tenant_location_configure(self, tenant_id: TenantId, config: dict[str, Any], **kwargs):
|
||||
if config["mode"].startswith("Attached") and "generation" not in config:
|
||||
config["generation"] = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
config["generation"] = self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
|
||||
|
||||
client = self.http_client()
|
||||
return client.tenant_location_conf(tenant_id, config, **kwargs)
|
||||
@@ -2441,14 +2447,14 @@ class NeonPageserver(PgProtocol):
|
||||
generation: Optional[int] = None,
|
||||
) -> TenantId:
|
||||
if generation is None:
|
||||
generation = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
generation = self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
|
||||
client = self.http_client(auth_token=auth_token)
|
||||
return client.tenant_create(tenant_id, conf, generation=generation)
|
||||
|
||||
def tenant_load(self, tenant_id: TenantId):
|
||||
client = self.http_client()
|
||||
return client.tenant_load(
|
||||
tenant_id, generation=self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
tenant_id, generation=self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
|
||||
)
|
||||
|
||||
|
||||
@@ -2859,6 +2865,7 @@ class NeonProxy(PgProtocol):
|
||||
self.auth_backend = auth_backend
|
||||
self.metric_collection_endpoint = metric_collection_endpoint
|
||||
self.metric_collection_interval = metric_collection_interval
|
||||
self.http_timeout_seconds = 15
|
||||
self._popen: Optional[subprocess.Popen[bytes]] = None
|
||||
|
||||
def start(self) -> NeonProxy:
|
||||
@@ -2897,6 +2904,7 @@ class NeonProxy(PgProtocol):
|
||||
*["--proxy", f"{self.host}:{self.proxy_port}"],
|
||||
*["--mgmt", f"{self.host}:{self.mgmt_port}"],
|
||||
*["--wss", f"{self.host}:{self.external_http_port}"],
|
||||
*["--sql-over-http-timeout", f"{self.http_timeout_seconds}s"],
|
||||
*["-c", str(crt_path)],
|
||||
*["-k", str(key_path)],
|
||||
*self.auth_backend.extra_args(),
|
||||
@@ -2937,6 +2945,8 @@ class NeonProxy(PgProtocol):
|
||||
password = quote(kwargs["password"])
|
||||
expected_code = kwargs.get("expected_code")
|
||||
|
||||
log.info(f"Executing http query: {query}")
|
||||
|
||||
connstr = f"postgresql://{user}:{password}@{self.domain}:{self.proxy_port}/postgres"
|
||||
response = requests.post(
|
||||
f"https://{self.domain}:{self.external_http_port}/sql",
|
||||
@@ -2959,6 +2969,8 @@ class NeonProxy(PgProtocol):
|
||||
password = kwargs["password"]
|
||||
expected_code = kwargs.get("expected_code")
|
||||
|
||||
log.info(f"Executing http2 query: {query}")
|
||||
|
||||
connstr = f"postgresql://{user}:{password}@{self.domain}:{self.proxy_port}/postgres"
|
||||
async with httpx.AsyncClient(
|
||||
http2=True, verify=str(self.test_output_dir / "proxy.crt")
|
||||
@@ -3907,7 +3919,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
|
||||
|
||||
psql_path = os.path.join(pg_bin.pg_bin_path, "psql")
|
||||
|
||||
pageserver_id = env.attachment_service.locate(endpoint.tenant_id)[0]["node_id"]
|
||||
pageserver_id = env.storage_controller.locate(endpoint.tenant_id)[0]["node_id"]
|
||||
cmd = rf"""
|
||||
{psql_path} \
|
||||
--no-psqlrc \
|
||||
@@ -3971,18 +3983,27 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
|
||||
|
||||
def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) -> Lsn:
|
||||
"""Wait logical replication subscriber to sync with publisher."""
|
||||
publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
while True:
|
||||
|
||||
def is_synced(publisher_lsn):
|
||||
# Even if pg_stat_subscription.latest_end_lsn is caughtup, some tables
|
||||
# might not be synced because until sync worker finishes main apply
|
||||
# continues to advance.
|
||||
rels_synced = subscriber.safe_psql(
|
||||
"select count(*) = 0 from pg_subscription_rel where srsubstate != 'r'"
|
||||
)[0][0]
|
||||
log.info(f"number of not synced rels: {rels_synced}")
|
||||
assert rels_synced
|
||||
res = subscriber.safe_psql("select latest_end_lsn from pg_catalog.pg_stat_subscription")[0][
|
||||
0
|
||||
]
|
||||
if res:
|
||||
log.info(f"subscriber_lsn={res}")
|
||||
subscriber_lsn = Lsn(res)
|
||||
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={ publisher_lsn}")
|
||||
if subscriber_lsn >= publisher_lsn:
|
||||
return subscriber_lsn
|
||||
time.sleep(0.5)
|
||||
log.info(f"subscriber_lsn={res}")
|
||||
subscriber_lsn = Lsn(res)
|
||||
log.info(f"Subscriber LSN={subscriber_lsn}, publisher LSN={ publisher_lsn}")
|
||||
assert subscriber_lsn >= publisher_lsn
|
||||
|
||||
publisher_lsn = Lsn(publisher.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
wait_until(30, 0.5, partial(is_synced, publisher_lsn))
|
||||
return publisher_lsn
|
||||
|
||||
|
||||
def tenant_get_shards(
|
||||
@@ -3994,7 +4015,7 @@ def tenant_get_shards(
|
||||
us to figure out the shards for a tenant.
|
||||
|
||||
If the caller provides `pageserver_id`, it will be used for all shards, even
|
||||
if the shard is indicated by attachment service to be on some other pageserver.
|
||||
if the shard is indicated by storage controller to be on some other pageserver.
|
||||
|
||||
Caller should over the response to apply their per-pageserver action to
|
||||
each shard
|
||||
@@ -4010,7 +4031,7 @@ def tenant_get_shards(
|
||||
TenantShardId.parse(s["shard_id"]),
|
||||
override_pageserver or env.get_pageserver(s["node_id"]),
|
||||
)
|
||||
for s in env.attachment_service.locate(tenant_id)
|
||||
for s in env.storage_controller.locate(tenant_id)
|
||||
]
|
||||
else:
|
||||
# Assume an unsharded tenant
|
||||
|
||||
@@ -318,6 +318,13 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
assert isinstance(res_json["tenant_shards"], list)
|
||||
return res_json
|
||||
|
||||
def tenant_get_location(self, tenant_id: TenantShardId):
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/location_config/{tenant_id}",
|
||||
)
|
||||
self.verbose_error(res)
|
||||
return res.json()
|
||||
|
||||
def tenant_delete(self, tenant_id: Union[TenantId, TenantShardId]):
|
||||
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -43,7 +43,7 @@ def single_timeline(
|
||||
log.info("detach template tenant form pageserver")
|
||||
env.pageserver.tenant_detach(template_tenant)
|
||||
env.pageserver.allowed_errors.append(
|
||||
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
|
||||
# tenant detach causes this because the underlying attach-hook removes the tenant from storage controller entirely
|
||||
".*Dropped remote consistent LSN updates.*",
|
||||
)
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ def setup_env(
|
||||
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
|
||||
env.pageserver.tenant_detach(template_tenant)
|
||||
env.pageserver.allowed_errors.append(
|
||||
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
|
||||
# tenant detach causes this because the underlying attach-hook removes the tenant from storage controller entirely
|
||||
".*Dropped remote consistent LSN updates.*",
|
||||
)
|
||||
env.pageserver.tenant_attach(template_tenant, config)
|
||||
|
||||
@@ -92,7 +92,7 @@ def setup_tenant_template(env: NeonEnv, n_txns: int):
|
||||
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
|
||||
env.pageserver.tenant_detach(template_tenant)
|
||||
env.pageserver.allowed_errors.append(
|
||||
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
|
||||
# tenant detach causes this because the underlying attach-hook removes the tenant from storage controller entirely
|
||||
".*Dropped remote consistent LSN updates.*",
|
||||
)
|
||||
env.pageserver.tenant_attach(template_tenant, config)
|
||||
|
||||
@@ -114,7 +114,7 @@ def setup_tenant_template(env: NeonEnv, pg_bin: PgBin, scale: int):
|
||||
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
|
||||
env.pageserver.tenant_detach(template_tenant)
|
||||
env.pageserver.allowed_errors.append(
|
||||
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
|
||||
# tenant detach causes this because the underlying attach-hook removes the tenant from storage controller entirely
|
||||
".*Dropped remote consistent LSN updates.*",
|
||||
)
|
||||
env.pageserver.tenant_attach(template_tenant, config)
|
||||
|
||||
@@ -56,12 +56,12 @@ def measure_recovery_time(env: NeonCompare):
|
||||
# Delete the Tenant in the pageserver: this will drop local and remote layers, such that
|
||||
# when we "create" the Tenant again, we will replay the WAL from the beginning.
|
||||
#
|
||||
# This is a "weird" thing to do, and can confuse the attachment service as we're re-using
|
||||
# This is a "weird" thing to do, and can confuse the storage controller as we're re-using
|
||||
# the same tenant ID for a tenant that is logically different from the pageserver's point
|
||||
# of view, but the same as far as the safekeeper/WAL is concerned. To work around that,
|
||||
# we will explicitly create the tenant in the same generation that it was previously
|
||||
# attached in.
|
||||
attach_status = env.env.attachment_service.inspect(tenant_shard_id=env.tenant)
|
||||
attach_status = env.env.storage_controller.inspect(tenant_shard_id=env.tenant)
|
||||
assert attach_status is not None
|
||||
(attach_gen, _) = attach_status
|
||||
|
||||
|
||||
@@ -137,7 +137,7 @@ def test_no_config(positive_env: NeonEnv, content_type: Optional[str]):
|
||||
ps_http.tenant_detach(tenant_id)
|
||||
assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()]
|
||||
|
||||
body = {"generation": env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)}
|
||||
body = {"generation": env.storage_controller.attach_hook_issue(tenant_id, env.pageserver.id)}
|
||||
|
||||
ps_http.post(
|
||||
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",
|
||||
|
||||
@@ -85,9 +85,9 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
# the endpoint. Whereas the previous reconfiguration was like a healthy migration, this
|
||||
# is more like what happens in an unexpected pageserver failure.
|
||||
#
|
||||
# Since we're dual-attached, need to tip-off attachment service to treat the one we're
|
||||
# Since we're dual-attached, need to tip-off storage controller to treat the one we're
|
||||
# about to start as the attached pageserver
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, env.pageservers[0].id)
|
||||
env.storage_controller.attach_hook_issue(env.initial_tenant, env.pageservers[0].id)
|
||||
env.pageservers[0].start()
|
||||
env.pageservers[1].stop()
|
||||
|
||||
@@ -97,9 +97,9 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
assert fetchone() == (100000,)
|
||||
|
||||
env.pageservers[0].stop()
|
||||
# Since we're dual-attached, need to tip-off attachment service to treat the one we're
|
||||
# Since we're dual-attached, need to tip-off storage controller to treat the one we're
|
||||
# about to start as the attached pageserver
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, env.pageservers[1].id)
|
||||
env.storage_controller.attach_hook_issue(env.initial_tenant, env.pageservers[1].id)
|
||||
env.pageservers[1].start()
|
||||
|
||||
# Test a (former) bug where a child process spins without updating its connection string
|
||||
|
||||
@@ -133,7 +133,7 @@ def test_create_snapshot(
|
||||
for sk in env.safekeepers:
|
||||
sk.stop()
|
||||
env.pageserver.stop()
|
||||
env.attachment_service.stop()
|
||||
env.storage_controller.stop()
|
||||
|
||||
# Directory `compatibility_snapshot_dir` is uploaded to S3 in a workflow, keep the name in sync with it
|
||||
compatibility_snapshot_dir = (
|
||||
@@ -242,7 +242,7 @@ def test_forward_compatibility(
|
||||
# everything else: our test code is written for latest CLI args.
|
||||
env.neon_local_binpath = neon_local_binpath
|
||||
|
||||
neon_env_builder.start()
|
||||
neon_env_builder.start(register_pageservers=True)
|
||||
|
||||
check_neon_works(
|
||||
env,
|
||||
|
||||
@@ -159,7 +159,7 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
|
||||
time.sleep(1.1) # so that we can use change in pre_stat.st_mtime to detect overwrites
|
||||
|
||||
def get_generation_number():
|
||||
attachment = env.attachment_service.inspect(tenant_id)
|
||||
attachment = env.storage_controller.inspect(tenant_id)
|
||||
assert attachment is not None
|
||||
return attachment[0]
|
||||
|
||||
|
||||
@@ -133,7 +133,7 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder):
|
||||
# Stop default ps/sk
|
||||
env.neon_cli.pageserver_stop(env.pageserver.id)
|
||||
env.neon_cli.safekeeper_stop()
|
||||
env.neon_cli.attachment_service_stop(False)
|
||||
env.neon_cli.storage_controller_stop(False)
|
||||
|
||||
# Keep NeonEnv state up to date, it usually owns starting/stopping services
|
||||
env.pageserver.running = False
|
||||
@@ -175,7 +175,7 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder):
|
||||
env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 2)
|
||||
|
||||
# Stop this to get out of the way of the following `start`
|
||||
env.neon_cli.attachment_service_stop(False)
|
||||
env.neon_cli.storage_controller_stop(False)
|
||||
|
||||
# Default start
|
||||
res = env.neon_cli.raw_cli(["start"])
|
||||
|
||||
@@ -73,7 +73,7 @@ def check_client(env: NeonEnv, client: PageserverHttpClient):
|
||||
# create new tenant and check it is also there
|
||||
tenant_id = TenantId.generate()
|
||||
client.tenant_create(
|
||||
tenant_id, generation=env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)
|
||||
tenant_id, generation=env.storage_controller.attach_hook_issue(tenant_id, env.pageserver.id)
|
||||
)
|
||||
assert tenant_id in {TenantId(t["id"]) for t in client.tenant_list()}
|
||||
|
||||
|
||||
@@ -203,7 +203,10 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
env.broker.try_start()
|
||||
for sk in env.safekeepers:
|
||||
sk.start()
|
||||
env.attachment_service.start()
|
||||
env.storage_controller.start()
|
||||
|
||||
# We will start a pageserver with no control_plane_api set, so it won't be able to self-register
|
||||
env.storage_controller.node_register(env.pageserver)
|
||||
|
||||
env.pageserver.start(overrides=('--pageserver-config-override=control_plane_api=""',))
|
||||
|
||||
@@ -285,7 +288,7 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
|
||||
attached_to_id = env.attachment_service.locate(env.initial_tenant)[0]["node_id"]
|
||||
attached_to_id = env.storage_controller.locate(env.initial_tenant)[0]["node_id"]
|
||||
main_pageserver = env.get_pageserver(attached_to_id)
|
||||
other_pageserver = [p for p in env.pageservers if p.id != attached_to_id][0]
|
||||
|
||||
@@ -310,7 +313,7 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Now advance the generation in the control plane: subsequent validations
|
||||
# from the running pageserver will fail. No more deletions should happen.
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, other_pageserver.id)
|
||||
env.storage_controller.attach_hook_issue(env.initial_tenant, other_pageserver.id)
|
||||
generate_uploads_and_deletions(env, init=False, pageserver=main_pageserver)
|
||||
|
||||
assert_deletion_queue(ps_http, lambda n: n > 0)
|
||||
@@ -366,7 +369,7 @@ def test_deletion_queue_recovery(
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
|
||||
attached_to_id = env.attachment_service.locate(env.initial_tenant)[0]["node_id"]
|
||||
attached_to_id = env.storage_controller.locate(env.initial_tenant)[0]["node_id"]
|
||||
main_pageserver = env.get_pageserver(attached_to_id)
|
||||
other_pageserver = [p for p in env.pageservers if p.id != attached_to_id][0]
|
||||
|
||||
@@ -428,7 +431,7 @@ def test_deletion_queue_recovery(
|
||||
|
||||
if keep_attachment == KeepAttachment.LOSE:
|
||||
some_other_pageserver = other_pageserver.id
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
|
||||
env.storage_controller.attach_hook_issue(env.initial_tenant, some_other_pageserver)
|
||||
|
||||
main_pageserver.start()
|
||||
|
||||
@@ -494,7 +497,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
)
|
||||
|
||||
# Simulate a major incident: the control plane goes offline
|
||||
env.attachment_service.stop()
|
||||
env.storage_controller.stop()
|
||||
|
||||
# Remember how many validations had happened before the control plane went offline
|
||||
validated = get_deletion_queue_validated(ps_http)
|
||||
@@ -511,7 +514,6 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP
|
||||
env.pageserver.start(
|
||||
overrides=("--pageserver-config-override=control_plane_emergency_mode=true",),
|
||||
register=False,
|
||||
)
|
||||
|
||||
# The pageserver should provide service to clients
|
||||
@@ -525,7 +527,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
assert get_deletion_queue_executed(ps_http) == 0
|
||||
|
||||
# When the control plane comes back up, normal service should resume
|
||||
env.attachment_service.start()
|
||||
env.storage_controller.start()
|
||||
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
assert get_deletion_queue_depth(ps_http) == 0
|
||||
|
||||
@@ -157,7 +157,7 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
|
||||
workload.churn_rows(rng.randint(128, 256), pageserver.id)
|
||||
workload.validate(pageserver.id)
|
||||
elif last_state_ps[0].startswith("Attached"):
|
||||
# The `attachment_service` will only re-attach on startup when a pageserver was the
|
||||
# The `storage_controller` will only re-attach on startup when a pageserver was the
|
||||
# holder of the latest generation: otherwise the pageserver will revert to detached
|
||||
# state if it was running attached with a stale generation
|
||||
last_state[pageserver.id] = ("Detached", None)
|
||||
@@ -182,12 +182,12 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
|
||||
generation = last_state_ps[1]
|
||||
else:
|
||||
# Switch generations, while also jumping between attached states
|
||||
generation = env.attachment_service.attach_hook_issue(
|
||||
generation = env.storage_controller.attach_hook_issue(
|
||||
tenant_id, pageserver.id
|
||||
)
|
||||
latest_attached = pageserver.id
|
||||
else:
|
||||
generation = env.attachment_service.attach_hook_issue(tenant_id, pageserver.id)
|
||||
generation = env.storage_controller.attach_hook_issue(tenant_id, pageserver.id)
|
||||
latest_attached = pageserver.id
|
||||
else:
|
||||
generation = None
|
||||
@@ -273,7 +273,7 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
|
||||
# Encourage the new location to download while still in secondary mode
|
||||
pageserver_b.http_client().tenant_secondary_download(tenant_id)
|
||||
|
||||
migrated_generation = env.attachment_service.attach_hook_issue(tenant_id, pageserver_b.id)
|
||||
migrated_generation = env.storage_controller.attach_hook_issue(tenant_id, pageserver_b.id)
|
||||
log.info(f"Acquired generation {migrated_generation} for destination pageserver")
|
||||
assert migrated_generation == initial_generation + 1
|
||||
|
||||
@@ -436,7 +436,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
assert env.attachment_service is not None
|
||||
assert env.storage_controller is not None
|
||||
assert isinstance(env.pageserver_remote_storage, S3Storage) # Satisfy linter
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
@@ -564,3 +564,35 @@ async def test_sql_over_http2(static_proxy: NeonProxy):
|
||||
"select 42 as answer", [], user="http", password="http", expected_code=200
|
||||
)
|
||||
assert resp["rows"] == [{"answer": 42}]
|
||||
|
||||
|
||||
def test_sql_over_http_timeout_cancel(static_proxy: NeonProxy):
|
||||
static_proxy.safe_psql("create role http with login password 'http' superuser")
|
||||
|
||||
static_proxy.safe_psql("create table test_table ( id int primary key )")
|
||||
|
||||
# insert into a table, with a unique constraint, after sleeping for n seconds
|
||||
query = "WITH temp AS ( \
|
||||
SELECT pg_sleep($1) as sleep, $2::int as id \
|
||||
) INSERT INTO test_table (id) SELECT id FROM temp"
|
||||
|
||||
# expect to fail with timeout
|
||||
res = static_proxy.http_query(
|
||||
query,
|
||||
[static_proxy.http_timeout_seconds + 1, 1],
|
||||
user="http",
|
||||
password="http",
|
||||
expected_code=400,
|
||||
)
|
||||
assert "Query cancelled, runtime exceeded" in res["message"], "HTTP query should time out"
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
res = static_proxy.http_query(query, [1, 1], user="http", password="http", expected_code=200)
|
||||
assert res["command"] == "INSERT", "HTTP query should insert"
|
||||
assert res["rowCount"] == 1, "HTTP query should insert"
|
||||
|
||||
res = static_proxy.http_query(query, [0, 1], user="http", password="http", expected_code=400)
|
||||
assert (
|
||||
"duplicate key value violates unique constraint" in res["message"]
|
||||
), "HTTP query should conflict"
|
||||
|
||||
@@ -169,7 +169,7 @@ def test_remote_storage_backup_and_restore(
|
||||
# Ensure that even though the tenant is broken, retrying the attachment fails
|
||||
with pytest.raises(Exception, match="Tenant state is Broken"):
|
||||
# Use same generation as in previous attempt
|
||||
gen_state = env.attachment_service.inspect(tenant_id)
|
||||
gen_state = env.storage_controller.inspect(tenant_id)
|
||||
assert gen_state is not None
|
||||
generation = gen_state[0]
|
||||
env.pageserver.tenant_attach(tenant_id, generation=generation)
|
||||
@@ -355,7 +355,7 @@ def test_remote_storage_upload_queue_retries(
|
||||
env.pageserver.stop(immediate=True)
|
||||
env.endpoints.stop_all()
|
||||
|
||||
# We are about to forcibly drop local dirs. Attachment service will increment generation in re-attach before
|
||||
# We are about to forcibly drop local dirs. Storage controller will increment generation in re-attach before
|
||||
# we later increment when actually attaching it again, leading to skipping a generation and potentially getting
|
||||
# these warnings if there was a durable but un-executed deletion list at time of restart.
|
||||
env.pageserver.allowed_errors.extend(
|
||||
|
||||
@@ -80,7 +80,7 @@ def test_tenant_s3_restore(
|
||||
assert (
|
||||
ps_http.get_metric_value("pageserver_tenant_manager_slots") == 0
|
||||
), "tenant removed before we deletion was issued"
|
||||
env.attachment_service.attach_hook_drop(tenant_id)
|
||||
env.storage_controller.attach_hook_drop(tenant_id)
|
||||
|
||||
tenant_path = env.pageserver.tenant_dir(tenant_id)
|
||||
assert not tenant_path.exists()
|
||||
@@ -103,7 +103,7 @@ def test_tenant_s3_restore(
|
||||
tenant_id, timestamp=ts_before_deletion, done_if_after=ts_after_deletion
|
||||
)
|
||||
|
||||
generation = env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)
|
||||
generation = env.storage_controller.attach_hook_issue(tenant_id, env.pageserver.id)
|
||||
|
||||
ps_http.tenant_attach(tenant_id, generation=generation)
|
||||
env.pageserver.quiesce_tenants()
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
from typing import Dict, List, Union
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
@@ -8,7 +9,11 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
from fixtures.remote_storage import s3_storage
|
||||
from fixtures.types import Lsn, TenantShardId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
|
||||
def test_sharding_smoke(
|
||||
@@ -43,7 +48,7 @@ def test_sharding_smoke(
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
pageservers = dict((int(p.id), p) for p in env.pageservers)
|
||||
shards = env.attachment_service.locate(tenant_id)
|
||||
shards = env.storage_controller.locate(tenant_id)
|
||||
|
||||
def get_sizes():
|
||||
sizes = {}
|
||||
@@ -86,7 +91,7 @@ def test_sharding_smoke(
|
||||
)
|
||||
assert timelines == {env.initial_timeline, timeline_b}
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
def test_sharding_split_unsharded(
|
||||
@@ -102,7 +107,7 @@ def test_sharding_split_unsharded(
|
||||
|
||||
# Check that we created with an unsharded TenantShardId: this is the default,
|
||||
# but check it in case we change the default in future
|
||||
assert env.attachment_service.inspect(TenantShardId(tenant_id, 0, 0)) is not None
|
||||
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 0)) is not None
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id, branch_name="main")
|
||||
workload.init()
|
||||
@@ -110,15 +115,15 @@ def test_sharding_split_unsharded(
|
||||
workload.validate()
|
||||
|
||||
# Split one shard into two
|
||||
env.attachment_service.tenant_shard_split(tenant_id, shard_count=2)
|
||||
env.storage_controller.tenant_shard_split(tenant_id, shard_count=2)
|
||||
|
||||
# Check we got the shard IDs we expected
|
||||
assert env.attachment_service.inspect(TenantShardId(tenant_id, 0, 2)) is not None
|
||||
assert env.attachment_service.inspect(TenantShardId(tenant_id, 1, 2)) is not None
|
||||
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 2)) is not None
|
||||
assert env.storage_controller.inspect(TenantShardId(tenant_id, 1, 2)) is not None
|
||||
|
||||
workload.validate()
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
def test_sharding_split_smoke(
|
||||
@@ -161,7 +166,7 @@ def test_sharding_split_smoke(
|
||||
workload.write_rows(256)
|
||||
|
||||
# Note which pageservers initially hold a shard after tenant creation
|
||||
pre_split_pageserver_ids = [loc["node_id"] for loc in env.attachment_service.locate(tenant_id)]
|
||||
pre_split_pageserver_ids = [loc["node_id"] for loc in env.storage_controller.locate(tenant_id)]
|
||||
|
||||
# For pageservers holding a shard, validate their ingest statistics
|
||||
# reflect a proper splitting of the WAL.
|
||||
@@ -213,9 +218,9 @@ def test_sharding_split_smoke(
|
||||
# Before split, old shards exist
|
||||
assert shards_on_disk(old_shard_ids)
|
||||
|
||||
env.attachment_service.tenant_shard_split(tenant_id, shard_count=split_shard_count)
|
||||
env.storage_controller.tenant_shard_split(tenant_id, shard_count=split_shard_count)
|
||||
|
||||
post_split_pageserver_ids = [loc["node_id"] for loc in env.attachment_service.locate(tenant_id)]
|
||||
post_split_pageserver_ids = [loc["node_id"] for loc in env.storage_controller.locate(tenant_id)]
|
||||
# We should have split into 8 shards, on the same 4 pageservers we started on.
|
||||
assert len(post_split_pageserver_ids) == split_shard_count
|
||||
assert len(set(post_split_pageserver_ids)) == shard_count
|
||||
@@ -261,7 +266,7 @@ def test_sharding_split_smoke(
|
||||
# Check that we didn't do any spurious reconciliations.
|
||||
# Total number of reconciles should have been one per original shard, plus
|
||||
# one for each shard that was migrated.
|
||||
reconcile_ok = env.attachment_service.get_metric_value(
|
||||
reconcile_ok = env.storage_controller.get_metric_value(
|
||||
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
|
||||
)
|
||||
assert reconcile_ok == shard_count + split_shard_count // 2
|
||||
@@ -269,19 +274,19 @@ def test_sharding_split_smoke(
|
||||
# Check that no cancelled or errored reconciliations occurred: this test does no
|
||||
# failure injection and should run clean.
|
||||
assert (
|
||||
env.attachment_service.get_metric_value(
|
||||
env.storage_controller.get_metric_value(
|
||||
"storage_controller_reconcile_complete_total", filter={"status": "cancel"}
|
||||
)
|
||||
is None
|
||||
)
|
||||
assert (
|
||||
env.attachment_service.get_metric_value(
|
||||
env.storage_controller.get_metric_value(
|
||||
"storage_controller_reconcile_complete_total", filter={"status": "error"}
|
||||
)
|
||||
is None
|
||||
)
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# Validate pageserver state
|
||||
shards_exist: list[TenantShardId] = []
|
||||
@@ -310,6 +315,96 @@ def test_sharding_split_smoke(
|
||||
workload.validate()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("initial_stripe_size", [None, 65536])
|
||||
def test_sharding_split_stripe_size(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
httpserver: HTTPServer,
|
||||
httpserver_listen_address,
|
||||
initial_stripe_size: int,
|
||||
):
|
||||
"""
|
||||
Check that modifying stripe size inline with a shard split works as expected
|
||||
"""
|
||||
(host, port) = httpserver_listen_address
|
||||
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify"
|
||||
neon_env_builder.num_pageservers = 1
|
||||
|
||||
# Set up fake HTTP notify endpoint: we will use this to validate that we receive
|
||||
# the correct stripe size after split.
|
||||
notifications = []
|
||||
|
||||
def handler(request: Request):
|
||||
log.info(f"Notify request: {request}")
|
||||
notifications.append(request.json)
|
||||
return Response(status=200)
|
||||
|
||||
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_shard_count=1, initial_tenant_shard_stripe_size=initial_stripe_size
|
||||
)
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
assert len(notifications) == 1
|
||||
expect: Dict[str, Union[List[Dict[str, int]], str, None, int]] = {
|
||||
"tenant_id": str(env.initial_tenant),
|
||||
"stripe_size": None,
|
||||
"shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}],
|
||||
}
|
||||
assert notifications[0] == expect
|
||||
|
||||
new_stripe_size = 2048
|
||||
env.storage_controller.tenant_shard_split(
|
||||
tenant_id, shard_count=2, shard_stripe_size=new_stripe_size
|
||||
)
|
||||
|
||||
# Check that we ended up with the stripe size that we expected, both on the pageserver
|
||||
# and in the notifications to compute
|
||||
assert len(notifications) == 2
|
||||
expect_after: Dict[str, Union[List[Dict[str, int]], str, None, int]] = {
|
||||
"tenant_id": str(env.initial_tenant),
|
||||
"stripe_size": new_stripe_size,
|
||||
"shards": [
|
||||
{"node_id": int(env.pageservers[0].id), "shard_number": 0},
|
||||
{"node_id": int(env.pageservers[0].id), "shard_number": 1},
|
||||
],
|
||||
}
|
||||
log.info(f"Got notification: {notifications[1]}")
|
||||
assert notifications[1] == expect_after
|
||||
|
||||
# Inspect the stripe size on the pageserver
|
||||
shard_0_loc = (
|
||||
env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 0, 2))
|
||||
)
|
||||
assert shard_0_loc["shard_stripe_size"] == new_stripe_size
|
||||
shard_1_loc = (
|
||||
env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 1, 2))
|
||||
)
|
||||
assert shard_1_loc["shard_stripe_size"] == new_stripe_size
|
||||
|
||||
# Ensure stripe size survives a pageserver restart
|
||||
env.pageservers[0].stop()
|
||||
env.pageservers[0].start()
|
||||
shard_0_loc = (
|
||||
env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 0, 2))
|
||||
)
|
||||
assert shard_0_loc["shard_stripe_size"] == new_stripe_size
|
||||
shard_1_loc = (
|
||||
env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 1, 2))
|
||||
)
|
||||
assert shard_1_loc["shard_stripe_size"] == new_stripe_size
|
||||
|
||||
# Ensure stripe size survives a storage controller restart
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
def assert_restart_notification():
|
||||
assert len(notifications) == 3
|
||||
assert notifications[2] == expect_after
|
||||
|
||||
wait_until(10, 1, assert_restart_notification)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
# The quantity of data isn't huge, but debug can be _very_ slow, and the things we're
|
||||
# validating in this test don't benefit much from debug assertions.
|
||||
@@ -360,7 +455,7 @@ def test_sharding_ingest(
|
||||
huge_layer_count = 0
|
||||
|
||||
# Inspect the resulting layer map, count how many layers are undersized.
|
||||
for shard in env.attachment_service.locate(tenant_id):
|
||||
for shard in env.storage_controller.locate(tenant_id):
|
||||
pageserver = env.get_pageserver(shard["node_id"])
|
||||
shard_id = shard["shard_id"]
|
||||
layer_map = pageserver.http_client().layer_map_info(shard_id, timeline_id)
|
||||
|
||||
@@ -6,10 +6,10 @@ from typing import Any, Dict, List, Union
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
AttachmentServiceApiException,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
StorageControllerApiException,
|
||||
TokenScope,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
@@ -36,7 +36,7 @@ from werkzeug.wrappers.response import Response
|
||||
def get_node_shard_counts(env: NeonEnv, tenant_ids):
|
||||
counts: defaultdict[str, int] = defaultdict(int)
|
||||
for tid in tenant_ids:
|
||||
for shard in env.attachment_service.locate(tid):
|
||||
for shard in env.storage_controller.locate(tid):
|
||||
counts[shard["node_id"]] += 1
|
||||
return counts
|
||||
|
||||
@@ -62,20 +62,20 @@ def test_sharding_service_smoke(
|
||||
|
||||
# Start services by hand so that we can skip a pageserver (this will start + register later)
|
||||
env.broker.try_start()
|
||||
env.attachment_service.start()
|
||||
env.storage_controller.start()
|
||||
env.pageservers[0].start()
|
||||
env.pageservers[1].start()
|
||||
for sk in env.safekeepers:
|
||||
sk.start()
|
||||
|
||||
# The pageservers we started should have registered with the sharding service on startup
|
||||
nodes = env.attachment_service.node_list()
|
||||
nodes = env.storage_controller.node_list()
|
||||
assert len(nodes) == 2
|
||||
assert set(n["id"] for n in nodes) == {env.pageservers[0].id, env.pageservers[1].id}
|
||||
|
||||
# Starting an additional pageserver should register successfully
|
||||
env.pageservers[2].start()
|
||||
nodes = env.attachment_service.node_list()
|
||||
nodes = env.storage_controller.node_list()
|
||||
assert len(nodes) == 3
|
||||
assert set(n["id"] for n in nodes) == {ps.id for ps in env.pageservers}
|
||||
|
||||
@@ -99,22 +99,22 @@ def test_sharding_service_smoke(
|
||||
# Creating and deleting timelines should work, using identical API to pageserver
|
||||
timeline_crud_tenant = next(iter(tenant_ids))
|
||||
timeline_id = TimelineId.generate()
|
||||
env.attachment_service.pageserver_api().timeline_create(
|
||||
env.storage_controller.pageserver_api().timeline_create(
|
||||
pg_version=PgVersion.NOT_SET, tenant_id=timeline_crud_tenant, new_timeline_id=timeline_id
|
||||
)
|
||||
timelines = env.attachment_service.pageserver_api().timeline_list(timeline_crud_tenant)
|
||||
timelines = env.storage_controller.pageserver_api().timeline_list(timeline_crud_tenant)
|
||||
assert len(timelines) == 2
|
||||
assert timeline_id in set(TimelineId(t["timeline_id"]) for t in timelines)
|
||||
# virtual_ps_http.timeline_delete(tenant_id=timeline_crud_tenant, timeline_id=timeline_id)
|
||||
timeline_delete_wait_completed(
|
||||
env.attachment_service.pageserver_api(), timeline_crud_tenant, timeline_id
|
||||
env.storage_controller.pageserver_api(), timeline_crud_tenant, timeline_id
|
||||
)
|
||||
timelines = env.attachment_service.pageserver_api().timeline_list(timeline_crud_tenant)
|
||||
timelines = env.storage_controller.pageserver_api().timeline_list(timeline_crud_tenant)
|
||||
assert len(timelines) == 1
|
||||
assert timeline_id not in set(TimelineId(t["timeline_id"]) for t in timelines)
|
||||
|
||||
# Marking a pageserver offline should migrate tenants away from it.
|
||||
env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"})
|
||||
env.storage_controller.node_configure(env.pageservers[0].id, {"availability": "Offline"})
|
||||
|
||||
def node_evacuated(node_id: int) -> None:
|
||||
counts = get_node_shard_counts(env, tenant_ids)
|
||||
@@ -124,7 +124,7 @@ def test_sharding_service_smoke(
|
||||
|
||||
# Marking pageserver active should not migrate anything to it
|
||||
# immediately
|
||||
env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Active"})
|
||||
env.storage_controller.node_configure(env.pageservers[0].id, {"availability": "Active"})
|
||||
time.sleep(1)
|
||||
assert get_node_shard_counts(env, tenant_ids)[env.pageservers[0].id] == 0
|
||||
|
||||
@@ -144,13 +144,13 @@ def test_sharding_service_smoke(
|
||||
|
||||
# Delete all the tenants
|
||||
for tid in tenant_ids:
|
||||
tenant_delete_wait_completed(env.attachment_service.pageserver_api(), tid, 10)
|
||||
tenant_delete_wait_completed(env.storage_controller.pageserver_api(), tid, 10)
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# Set a scheduling policy on one node, create all the tenants, observe
|
||||
# that the scheduling policy is respected.
|
||||
env.attachment_service.node_configure(env.pageservers[1].id, {"scheduling": "Draining"})
|
||||
env.storage_controller.node_configure(env.pageservers[1].id, {"scheduling": "Draining"})
|
||||
|
||||
# Create some fresh tenants
|
||||
tenant_ids = set(TenantId.generate() for i in range(0, tenant_count))
|
||||
@@ -163,7 +163,7 @@ def test_sharding_service_smoke(
|
||||
assert counts[env.pageservers[0].id] == tenant_shard_count // 2
|
||||
assert counts[env.pageservers[2].id] == tenant_shard_count // 2
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
def test_node_status_after_restart(
|
||||
@@ -173,28 +173,28 @@ def test_node_status_after_restart(
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Initially we have two online pageservers
|
||||
nodes = env.attachment_service.node_list()
|
||||
nodes = env.storage_controller.node_list()
|
||||
assert len(nodes) == 2
|
||||
|
||||
env.pageservers[1].stop()
|
||||
|
||||
env.attachment_service.stop()
|
||||
env.attachment_service.start()
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
def is_ready():
|
||||
assert env.attachment_service.ready() is True
|
||||
assert env.storage_controller.ready() is True
|
||||
|
||||
wait_until(30, 1, is_ready)
|
||||
|
||||
# We loaded nodes from database on restart
|
||||
nodes = env.attachment_service.node_list()
|
||||
nodes = env.storage_controller.node_list()
|
||||
assert len(nodes) == 2
|
||||
|
||||
# We should still be able to create a tenant, because the pageserver which is still online
|
||||
# should have had its availabilty state set to Active.
|
||||
env.attachment_service.tenant_create(TenantId.generate())
|
||||
env.storage_controller.tenant_create(TenantId.generate())
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
def test_sharding_service_passthrough(
|
||||
@@ -208,9 +208,9 @@ def test_sharding_service_passthrough(
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# We will talk to attachment service as if it was a pageserver, using the pageserver
|
||||
# We will talk to storage controller as if it was a pageserver, using the pageserver
|
||||
# HTTP client
|
||||
client = PageserverHttpClient(env.attachment_service_port, lambda: True)
|
||||
client = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
timelines = client.timeline_list(tenant_id=env.initial_tenant)
|
||||
assert len(timelines) == 1
|
||||
|
||||
@@ -221,22 +221,22 @@ def test_sharding_service_passthrough(
|
||||
}
|
||||
assert status["state"]["slug"] == "Active"
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
def test_sharding_service_restart(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_a = env.initial_tenant
|
||||
tenant_b = TenantId.generate()
|
||||
env.attachment_service.tenant_create(tenant_b)
|
||||
env.storage_controller.tenant_create(tenant_b)
|
||||
env.pageserver.tenant_detach(tenant_a)
|
||||
|
||||
# TODO: extend this test to use multiple pageservers, and check that locations don't move around
|
||||
# on restart.
|
||||
|
||||
# Attachment service restart
|
||||
env.attachment_service.stop()
|
||||
env.attachment_service.start()
|
||||
# Storage controller restart
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
observed = set(TenantId(tenant["id"]) for tenant in env.pageserver.http_client().tenant_list())
|
||||
|
||||
@@ -255,7 +255,7 @@ def test_sharding_service_restart(neon_env_builder: NeonEnvBuilder):
|
||||
assert tenant_a not in observed
|
||||
assert tenant_b in observed
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("warm_up", [True, False])
|
||||
@@ -271,27 +271,26 @@ def test_sharding_service_onboarding(neon_env_builder: NeonEnvBuilder, warm_up:
|
||||
# Start services by hand so that we can skip registration on one of the pageservers
|
||||
env = neon_env_builder.init_configs()
|
||||
env.broker.try_start()
|
||||
env.attachment_service.start()
|
||||
env.storage_controller.start()
|
||||
|
||||
# This is the pageserver where we'll initially create the tenant. Run it in emergency
|
||||
# mode so that it doesn't talk to storage controller, and do not register it.
|
||||
env.pageservers[0].allowed_errors.append(".*Emergency mode!.*")
|
||||
env.pageservers[0].start(
|
||||
overrides=("--pageserver-config-override=control_plane_emergency_mode=true",),
|
||||
register=False,
|
||||
)
|
||||
origin_ps = env.pageservers[0]
|
||||
|
||||
# This is the pageserver managed by the sharding service, where the tenant
|
||||
# will be attached after onboarding
|
||||
env.pageservers[1].start(register=True)
|
||||
env.pageservers[1].start()
|
||||
dest_ps = env.pageservers[1]
|
||||
virtual_ps_http = PageserverHttpClient(env.attachment_service_port, lambda: True)
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
|
||||
for sk in env.safekeepers:
|
||||
sk.start()
|
||||
|
||||
# Create a tenant directly via pageserver HTTP API, skipping the attachment service
|
||||
# Create a tenant directly via pageserver HTTP API, skipping the storage controller
|
||||
tenant_id = TenantId.generate()
|
||||
generation = 123
|
||||
origin_ps.http_client().tenant_create(tenant_id, generation=generation)
|
||||
@@ -324,7 +323,7 @@ def test_sharding_service_onboarding(neon_env_builder: NeonEnvBuilder, warm_up:
|
||||
|
||||
virtual_ps_http.tenant_secondary_download(tenant_id)
|
||||
|
||||
# Call into attachment service to onboard the tenant
|
||||
# Call into storage controller to onboard the tenant
|
||||
generation += 1
|
||||
virtual_ps_http.tenant_location_conf(
|
||||
tenant_id,
|
||||
@@ -347,7 +346,7 @@ def test_sharding_service_onboarding(neon_env_builder: NeonEnvBuilder, warm_up:
|
||||
},
|
||||
)
|
||||
|
||||
# As if doing a live migration, call into the attachment service to
|
||||
# As if doing a live migration, call into the storage controller to
|
||||
# set it to AttachedSingle: this is a no-op, but we test it because the
|
||||
# cloud control plane may call this for symmetry with live migration to
|
||||
# an individual pageserver
|
||||
@@ -375,8 +374,8 @@ def test_sharding_service_onboarding(neon_env_builder: NeonEnvBuilder, warm_up:
|
||||
assert dest_tenants[0]["generation"] == generation + 1
|
||||
|
||||
# The onboarded tenant should survive a restart of sharding service
|
||||
env.attachment_service.stop()
|
||||
env.attachment_service.start()
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
# The onboarded tenant should surviev a restart of pageserver
|
||||
dest_ps.stop()
|
||||
@@ -407,7 +406,7 @@ def test_sharding_service_onboarding(neon_env_builder: NeonEnvBuilder, warm_up:
|
||||
dest_tenant_conf_after = dest_ps.http_client().tenant_config(tenant_id)
|
||||
assert dest_tenant_conf_after.tenant_specific_overrides == modified_tenant_conf
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
def test_sharding_service_compute_hook(
|
||||
@@ -419,7 +418,7 @@ def test_sharding_service_compute_hook(
|
||||
Test that the sharding service calls out to the configured HTTP endpoint on attachment changes
|
||||
"""
|
||||
|
||||
# We will run two pageserver to migrate and check that the attachment service sends notifications
|
||||
# We will run two pageserver to migrate and check that the storage controller sends notifications
|
||||
# when migrating.
|
||||
neon_env_builder.num_pageservers = 2
|
||||
(host, port) = httpserver_listen_address
|
||||
@@ -450,7 +449,7 @@ def test_sharding_service_compute_hook(
|
||||
}
|
||||
assert notifications[0] == expect
|
||||
|
||||
env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"})
|
||||
env.storage_controller.node_configure(env.pageservers[0].id, {"availability": "Offline"})
|
||||
|
||||
def node_evacuated(node_id: int) -> None:
|
||||
counts = get_node_shard_counts(env, [env.initial_tenant])
|
||||
@@ -473,8 +472,8 @@ def test_sharding_service_compute_hook(
|
||||
wait_until(20, 0.25, received_migration_notification)
|
||||
|
||||
# When we restart, we should re-emit notifications for all tenants
|
||||
env.attachment_service.stop()
|
||||
env.attachment_service.start()
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
def received_restart_notification():
|
||||
assert len(notifications) == 3
|
||||
@@ -483,7 +482,7 @@ def test_sharding_service_compute_hook(
|
||||
wait_until(10, 1, received_restart_notification)
|
||||
|
||||
# Splitting a tenant should cause its stripe size to become visible in the compute notification
|
||||
env.attachment_service.tenant_shard_split(env.initial_tenant, shard_count=2)
|
||||
env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=2)
|
||||
expect = {
|
||||
"tenant_id": str(env.initial_tenant),
|
||||
"stripe_size": 32768,
|
||||
@@ -499,7 +498,7 @@ def test_sharding_service_compute_hook(
|
||||
|
||||
wait_until(10, 1, received_split_notification)
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
def test_sharding_service_debug_apis(neon_env_builder: NeonEnvBuilder):
|
||||
@@ -512,55 +511,55 @@ def test_sharding_service_debug_apis(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
env.attachment_service.tenant_create(tenant_id, shard_count=2, shard_stripe_size=8192)
|
||||
env.storage_controller.tenant_create(tenant_id, shard_count=2, shard_stripe_size=8192)
|
||||
|
||||
# Check that the consistency check passes on a freshly setup system
|
||||
env.attachment_service.consistency_check()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# These APIs are intentionally not implemented as methods on NeonAttachmentService, as
|
||||
# These APIs are intentionally not implemented as methods on NeonStorageController, as
|
||||
# they're just for use in unanticipated circumstances.
|
||||
|
||||
# Initial tenant (1 shard) and the one we just created (2 shards) should be visible
|
||||
response = env.attachment_service.request(
|
||||
response = env.storage_controller.request(
|
||||
"GET",
|
||||
f"{env.attachment_service_api}/debug/v1/tenant",
|
||||
headers=env.attachment_service.headers(TokenScope.ADMIN),
|
||||
f"{env.storage_controller_api}/debug/v1/tenant",
|
||||
headers=env.storage_controller.headers(TokenScope.ADMIN),
|
||||
)
|
||||
assert len(response.json()) == 3
|
||||
|
||||
# Scheduler should report the expected nodes and shard counts
|
||||
response = env.attachment_service.request(
|
||||
"GET", f"{env.attachment_service_api}/debug/v1/scheduler"
|
||||
response = env.storage_controller.request(
|
||||
"GET", f"{env.storage_controller_api}/debug/v1/scheduler"
|
||||
)
|
||||
# Two nodes, in a dict of node_id->node
|
||||
assert len(response.json()["nodes"]) == 2
|
||||
assert sum(v["shard_count"] for v in response.json()["nodes"].values()) == 3
|
||||
assert all(v["may_schedule"] for v in response.json()["nodes"].values())
|
||||
|
||||
response = env.attachment_service.request(
|
||||
response = env.storage_controller.request(
|
||||
"POST",
|
||||
f"{env.attachment_service_api}/debug/v1/node/{env.pageservers[1].id}/drop",
|
||||
headers=env.attachment_service.headers(TokenScope.ADMIN),
|
||||
f"{env.storage_controller_api}/debug/v1/node/{env.pageservers[1].id}/drop",
|
||||
headers=env.storage_controller.headers(TokenScope.ADMIN),
|
||||
)
|
||||
assert len(env.attachment_service.node_list()) == 1
|
||||
assert len(env.storage_controller.node_list()) == 1
|
||||
|
||||
response = env.attachment_service.request(
|
||||
response = env.storage_controller.request(
|
||||
"POST",
|
||||
f"{env.attachment_service_api}/debug/v1/tenant/{tenant_id}/drop",
|
||||
headers=env.attachment_service.headers(TokenScope.ADMIN),
|
||||
f"{env.storage_controller_api}/debug/v1/tenant/{tenant_id}/drop",
|
||||
headers=env.storage_controller.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
# Tenant drop should be reflected in dump output
|
||||
response = env.attachment_service.request(
|
||||
response = env.storage_controller.request(
|
||||
"GET",
|
||||
f"{env.attachment_service_api}/debug/v1/tenant",
|
||||
headers=env.attachment_service.headers(TokenScope.ADMIN),
|
||||
f"{env.storage_controller_api}/debug/v1/tenant",
|
||||
headers=env.storage_controller.headers(TokenScope.ADMIN),
|
||||
)
|
||||
assert len(response.json()) == 1
|
||||
|
||||
# Check that the 'drop' APIs didn't leave things in a state that would fail a consistency check: they're
|
||||
# meant to be unclean wrt the pageserver state, but not leave a broken storage controller behind.
|
||||
env.attachment_service.consistency_check()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
def test_sharding_service_s3_time_travel_recovery(
|
||||
@@ -584,10 +583,10 @@ def test_sharding_service_s3_time_travel_recovery(
|
||||
neon_env_builder.num_pageservers = 1
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
virtual_ps_http = PageserverHttpClient(env.attachment_service_port, lambda: True)
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
env.attachment_service.tenant_create(
|
||||
env.storage_controller.tenant_create(
|
||||
tenant_id,
|
||||
shard_count=2,
|
||||
shard_stripe_size=8192,
|
||||
@@ -595,7 +594,7 @@ def test_sharding_service_s3_time_travel_recovery(
|
||||
)
|
||||
|
||||
# Check that the consistency check passes
|
||||
env.attachment_service.consistency_check()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
branch_name = "main"
|
||||
timeline_id = env.neon_cli.create_timeline(
|
||||
@@ -670,28 +669,28 @@ def test_sharding_service_s3_time_travel_recovery(
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
endpoint.safe_psql("SELECT * FROM created_foo;")
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
def test_sharding_service_auth(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.auth_enabled = True
|
||||
env = neon_env_builder.init_start()
|
||||
svc = env.attachment_service
|
||||
api = env.attachment_service_api
|
||||
svc = env.storage_controller
|
||||
api = env.storage_controller_api
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
body: Dict[str, Any] = {"new_tenant_id": str(tenant_id)}
|
||||
|
||||
# No token
|
||||
with pytest.raises(
|
||||
AttachmentServiceApiException,
|
||||
StorageControllerApiException,
|
||||
match="Unauthorized: missing authorization header",
|
||||
):
|
||||
svc.request("POST", f"{env.attachment_service_api}/v1/tenant", json=body)
|
||||
svc.request("POST", f"{env.storage_controller_api}/v1/tenant", json=body)
|
||||
|
||||
# Token with incorrect scope
|
||||
with pytest.raises(
|
||||
AttachmentServiceApiException,
|
||||
StorageControllerApiException,
|
||||
match="Forbidden: JWT authentication error",
|
||||
):
|
||||
svc.request("POST", f"{api}/v1/tenant", json=body, headers=svc.headers(TokenScope.ADMIN))
|
||||
@@ -703,14 +702,14 @@ def test_sharding_service_auth(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# No token
|
||||
with pytest.raises(
|
||||
AttachmentServiceApiException,
|
||||
StorageControllerApiException,
|
||||
match="Unauthorized: missing authorization header",
|
||||
):
|
||||
svc.request("GET", f"{api}/debug/v1/tenant")
|
||||
|
||||
# Token with incorrect scope
|
||||
with pytest.raises(
|
||||
AttachmentServiceApiException,
|
||||
StorageControllerApiException,
|
||||
match="Forbidden: JWT authentication error",
|
||||
):
|
||||
svc.request(
|
||||
@@ -719,14 +718,14 @@ def test_sharding_service_auth(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# No token
|
||||
with pytest.raises(
|
||||
AttachmentServiceApiException,
|
||||
StorageControllerApiException,
|
||||
match="Unauthorized: missing authorization header",
|
||||
):
|
||||
svc.request("POST", f"{api}/upcall/v1/re-attach")
|
||||
|
||||
# Token with incorrect scope
|
||||
with pytest.raises(
|
||||
AttachmentServiceApiException,
|
||||
StorageControllerApiException,
|
||||
match="Forbidden: JWT authentication error",
|
||||
):
|
||||
svc.request(
|
||||
@@ -743,7 +742,7 @@ def test_sharding_service_tenant_conf(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
http = env.attachment_service.pageserver_api()
|
||||
http = env.storage_controller.pageserver_api()
|
||||
|
||||
default_value = "7days"
|
||||
new_value = "1h"
|
||||
@@ -769,4 +768,4 @@ def test_sharding_service_tenant_conf(neon_env_builder: NeonEnvBuilder):
|
||||
assert readback_ps.effective_config["pitr_interval"] == default_value
|
||||
assert "pitr_interval" not in readback_ps.tenant_specific_overrides
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
@@ -1011,7 +1011,7 @@ def test_eager_attach_does_not_queue_up(neon_env_builder: NeonEnvBuilder):
|
||||
resp = client.tenant_status(eager_tenant)
|
||||
assert resp["state"]["slug"] == "Active"
|
||||
|
||||
gen = env.attachment_service.attach_hook_issue(eager_tenant, env.pageserver.id)
|
||||
gen = env.storage_controller.attach_hook_issue(eager_tenant, env.pageserver.id)
|
||||
client.tenant_location_conf(
|
||||
eager_tenant,
|
||||
{
|
||||
@@ -1071,7 +1071,7 @@ def test_lazy_attach_activation(neon_env_builder: NeonEnvBuilder, activation_met
|
||||
# attach, it will consume the only permit because logical size calculation
|
||||
# is paused.
|
||||
|
||||
gen = env.attachment_service.attach_hook_issue(lazy_tenant, env.pageserver.id)
|
||||
gen = env.storage_controller.attach_hook_issue(lazy_tenant, env.pageserver.id)
|
||||
client.tenant_location_conf(
|
||||
lazy_tenant,
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user