Merge 2025-04-09 main commit 'a6ff8ec3d47963616d9cef07421d9319db958e8a' into yuchen/direct-io-delta-image-layer-write

This commit is contained in:
Christian Schwarz
2025-04-11 16:17:54 +02:00
67 changed files with 2211 additions and 466 deletions

View File

@@ -19,6 +19,7 @@
!pageserver/
!pgxn/
!proxy/
!object_storage/
!storage_scrubber/
!safekeeper/
!storage_broker/

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
/artifact_cache
/pg_install
/target
/tmp_check

55
Cargo.lock generated
View File

@@ -3991,6 +3991,33 @@ dependencies = [
"memchr",
]
[[package]]
name = "object_storage"
version = "0.0.1"
dependencies = [
"anyhow",
"axum",
"axum-extra",
"camino",
"camino-tempfile",
"futures",
"http-body-util",
"itertools 0.10.5",
"jsonwebtoken",
"prometheus",
"rand 0.8.5",
"remote_storage",
"serde",
"serde_json",
"test-log",
"tokio",
"tokio-util",
"tower 0.5.2",
"tracing",
"utils",
"workspace_hack",
]
[[package]]
name = "once_cell"
version = "1.20.2"
@@ -4693,7 +4720,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.6"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#f3cf448febde5fd298071d54d568a9c875a7a62b"
dependencies = [
"base64 0.22.1",
"byteorder",
@@ -4727,7 +4754,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.6"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#f3cf448febde5fd298071d54d568a9c875a7a62b"
dependencies = [
"bytes",
"chrono",
@@ -6925,6 +6952,28 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "test-log"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7f46083d221181166e5b6f6b1e5f1d499f3a76888826e6cb1d057554157cd0f"
dependencies = [
"env_logger",
"test-log-macros",
"tracing-subscriber",
]
[[package]]
name = "test-log-macros"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "888d0c3c6db53c0fdab160d2ed5e12ba745383d3e85813f2ea0f2b1475ab553f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "thiserror"
version = "1.0.69"
@@ -7172,7 +7221,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.10"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#1f21e7959a96a34dcfbfce1b14b73286cdadffe9"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#f3cf448febde5fd298071d54d568a9c875a7a62b"
dependencies = [
"async-trait",
"byteorder",

View File

@@ -40,6 +40,7 @@ members = [
"libs/proxy/postgres-protocol2",
"libs/proxy/postgres-types2",
"libs/proxy/tokio-postgres2",
"object_storage",
]
[workspace.package]
@@ -208,6 +209,7 @@ tracing-opentelemetry = "0.28"
tracing-serde = "0.2.0"
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
try-lock = "0.2.5"
test-log = { version = "0.2.17", default-features = false, features = ["log"] }
twox-hash = { version = "1.6.3", default-features = false }
typed-json = "0.1"
url = "2.2"

View File

@@ -89,6 +89,7 @@ RUN set -e \
--bin storage_broker \
--bin storage_controller \
--bin proxy \
--bin object_storage \
--bin neon_local \
--bin storage_scrubber \
--locked --release
@@ -121,6 +122,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_controller /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/object_storage /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_scrubber /usr/local/bin

View File

@@ -661,15 +661,8 @@ impl ComputeNode {
}
// Configure and start rsyslog for Postgres logs export
if self.has_feature(ComputeFeature::PostgresLogsExport) {
if let Some(ref project_id) = pspec.spec.cluster.cluster_id {
let host = PostgresLogsRsyslogConfig::default_host(project_id);
let conf = PostgresLogsRsyslogConfig::new(Some(&host));
configure_postgres_logs_export(conf)?;
} else {
warn!("not configuring rsyslog for Postgres logs export: project ID is missing")
}
}
let conf = PostgresLogsRsyslogConfig::new(pspec.spec.logs_export_host.as_deref());
configure_postgres_logs_export(conf)?;
// Launch remaining service threads
let _monitor_handle = launch_monitor(self);
@@ -1573,6 +1566,10 @@ impl ComputeNode {
});
}
// Reconfigure rsyslog for Postgres logs export
let conf = PostgresLogsRsyslogConfig::new(spec.logs_export_host.as_deref());
configure_postgres_logs_export(conf)?;
// Write new config
let pgdata_path = Path::new(&self.params.pgdata);
config::write_postgres_conf(

View File

@@ -7,7 +7,7 @@ use std::io::prelude::*;
use std::path::Path;
use compute_api::responses::TlsConfig;
use compute_api::spec::{ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, GenericOption};
use compute_api::spec::{ComputeAudit, ComputeMode, ComputeSpec, GenericOption};
use crate::pg_helpers::{
GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize, escape_conf_value,
@@ -255,7 +255,7 @@ pub fn write_postgres_conf(
// We need Postgres to send logs to rsyslog so that we can forward them
// further to customers' log aggregation systems.
if spec.features.contains(&ComputeFeature::PostgresLogsExport) {
if spec.logs_export_host.is_some() {
writeln!(file, "log_destination='stderr,syslog'")?;
}

View File

@@ -306,36 +306,6 @@ paths:
schema:
$ref: "#/components/schemas/GenericError"
/configure_telemetry:
post:
tags:
- Configure
summary: Configure rsyslog
description: |
This API endpoint configures rsyslog to forward Postgres logs
to a specified otel collector.
operationId: configureTelemetry
requestBody:
required: true
content:
application/json:
schema:
type: object
properties:
logs_export_host:
type: string
description: |
Hostname and the port of the otel collector. Leave empty to disable logs forwarding.
Example: config-shy-breeze-123-collector-monitoring.neon-telemetry.svc.cluster.local:54526
responses:
204:
description: "Telemetry configured successfully"
500:
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
components:
securitySchemes:
JWT:

View File

@@ -1,11 +1,9 @@
use std::sync::Arc;
use axum::body::Body;
use axum::extract::State;
use axum::response::Response;
use compute_api::requests::{ConfigurationRequest, ConfigureTelemetryRequest};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse};
use compute_api::spec::ComputeFeature;
use http::StatusCode;
use tokio::task;
use tracing::info;
@@ -13,7 +11,6 @@ use tracing::info;
use crate::compute::{ComputeNode, ParsedSpec};
use crate::http::JsonResponse;
use crate::http::extract::Json;
use crate::rsyslog::{PostgresLogsRsyslogConfig, configure_postgres_logs_export};
// Accept spec in JSON format and request compute configuration. If anything
// goes wrong after we set the compute status to `ConfigurationPending` and
@@ -95,25 +92,3 @@ pub(in crate::http) async fn configure(
JsonResponse::success(StatusCode::OK, body)
}
pub(in crate::http) async fn configure_telemetry(
State(compute): State<Arc<ComputeNode>>,
request: Json<ConfigureTelemetryRequest>,
) -> Response {
if !compute.has_feature(ComputeFeature::PostgresLogsExport) {
return JsonResponse::error(
StatusCode::PRECONDITION_FAILED,
"Postgres logs export feature is not enabled".to_string(),
);
}
let conf = PostgresLogsRsyslogConfig::new(request.logs_export_host.as_deref());
if let Err(err) = configure_postgres_logs_export(conf) {
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, err.to_string());
}
Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::from(""))
.unwrap()
}

View File

@@ -87,7 +87,6 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
.route("/configure_telemetry", post(configure::configure_telemetry))
.route("/database_schema", get(database_schema::get_schema_dump))
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects))
.route("/insights", get(insights::get_insights))

View File

@@ -119,16 +119,9 @@ impl<'a> PostgresLogsRsyslogConfig<'a> {
};
Ok(config_content)
}
/// Returns the default host for otel collector that receives Postgres logs
pub fn default_host(project_id: &str) -> String {
format!(
"config-{}-collector.neon-telemetry.svc.cluster.local:10514",
project_id
)
}
}
/// Writes rsyslogd configuration for Postgres logs export and restarts rsyslog.
pub fn configure_postgres_logs_export(conf: PostgresLogsRsyslogConfig) -> Result<()> {
let new_config = conf.build()?;
let current_config = PostgresLogsRsyslogConfig::current_config()?;
@@ -261,16 +254,5 @@ mod tests {
let res = conf.build();
assert!(res.is_err());
}
{
// Verify config with default host
let host = PostgresLogsRsyslogConfig::default_host("shy-breeze-123");
let conf = PostgresLogsRsyslogConfig::new(Some(&host));
let res = conf.build();
assert!(res.is_ok());
let conf_str = res.unwrap();
assert!(conf_str.contains(r#"shy-breeze-123"#));
assert!(conf_str.contains(r#"port="10514""#));
}
}
}

View File

@@ -20,8 +20,10 @@ use compute_api::spec::ComputeMode;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::local_env::{
InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf,
SafekeeperConf,
ObjectStorageConf, SafekeeperConf,
};
use control_plane::object_storage::OBJECT_STORAGE_DEFAULT_PORT;
use control_plane::object_storage::ObjectStorage;
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage_controller::{
@@ -39,7 +41,7 @@ use pageserver_api::controller_api::{
use pageserver_api::models::{
ShardParameters, TenantConfigRequest, TimelineCreateRequest, TimelineInfo,
};
use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId};
use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
use postgres_backend::AuthType;
use postgres_connection::parse_host_port;
use safekeeper_api::membership::SafekeeperGeneration;
@@ -91,6 +93,8 @@ enum NeonLocalCmd {
#[command(subcommand)]
Safekeeper(SafekeeperCmd),
#[command(subcommand)]
ObjectStorage(ObjectStorageCmd),
#[command(subcommand)]
Endpoint(EndpointCmd),
#[command(subcommand)]
Mappings(MappingsCmd),
@@ -454,6 +458,32 @@ enum SafekeeperCmd {
Restart(SafekeeperRestartCmdArgs),
}
#[derive(clap::Subcommand)]
#[clap(about = "Manage object storage")]
enum ObjectStorageCmd {
Start(ObjectStorageStartCmd),
Stop(ObjectStorageStopCmd),
}
#[derive(clap::Args)]
#[clap(about = "Start object storage")]
struct ObjectStorageStartCmd {
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
}
#[derive(clap::Args)]
#[clap(about = "Stop object storage")]
struct ObjectStorageStopCmd {
#[arg(value_enum, default_value = "fast")]
#[clap(
short = 'm',
help = "If 'immediate', don't flush repository data at shutdown"
)]
stop_mode: StopMode,
}
#[derive(clap::Args)]
#[clap(about = "Start local safekeeper")]
struct SafekeeperStartCmdArgs {
@@ -759,6 +789,7 @@ fn main() -> Result<()> {
}
NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)),
NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)),
NeonLocalCmd::ObjectStorage(subcmd) => rt.block_on(handle_object_storage(&subcmd, env)),
NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)),
NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
};
@@ -975,6 +1006,9 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
}
})
.collect(),
object_storage: ObjectStorageConf {
port: OBJECT_STORAGE_DEFAULT_PORT,
},
pg_distrib_dir: None,
neon_distrib_dir: None,
default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
@@ -1083,7 +1117,7 @@ async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> any
stripe_size: args
.shard_stripe_size
.map(ShardStripeSize)
.unwrap_or(ShardParameters::DEFAULT_STRIPE_SIZE),
.unwrap_or(DEFAULT_STRIPE_SIZE),
},
placement_policy: args.placement_policy.clone(),
config: tenant_conf,
@@ -1396,7 +1430,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
vec![(parsed.0, parsed.1.unwrap_or(5432))],
// If caller is telling us what pageserver to use, this is not a tenant which is
// full managed by storage controller, therefore not sharded.
ShardParameters::DEFAULT_STRIPE_SIZE,
DEFAULT_STRIPE_SIZE,
)
} else {
// Look up the currently attached location of the tenant, and its striping metadata,
@@ -1683,6 +1717,41 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
Ok(())
}
async fn handle_object_storage(subcmd: &ObjectStorageCmd, env: &local_env::LocalEnv) -> Result<()> {
use ObjectStorageCmd::*;
let storage = ObjectStorage::from_env(env);
// In tests like test_forward_compatibility or test_graceful_cluster_restart
// old neon binaries (without object_storage) are present
if !storage.bin.exists() {
eprintln!(
"{} binary not found. Ignore if this is a compatibility test",
storage.bin
);
return Ok(());
}
match subcmd {
Start(ObjectStorageStartCmd { start_timeout }) => {
if let Err(e) = storage.start(start_timeout).await {
eprintln!("object_storage start failed: {e}");
exit(1);
}
}
Stop(ObjectStorageStopCmd { stop_mode }) => {
let immediate = match stop_mode {
StopMode::Fast => false,
StopMode::Immediate => true,
};
if let Err(e) = storage.stop(immediate) {
eprintln!("proxy stop failed: {e}");
exit(1);
}
}
};
Ok(())
}
async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> {
match subcmd {
StorageBrokerCmd::Start(args) => {
@@ -1777,6 +1846,13 @@ async fn handle_start_all_impl(
.map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id)))
});
}
js.spawn(async move {
ObjectStorage::from_env(env)
.start(&retry_timeout)
.await
.map_err(|e| e.context("start object_storage"))
});
})();
let mut errors = Vec::new();
@@ -1874,6 +1950,11 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
}
}
let storage = ObjectStorage::from_env(env);
if let Err(e) = storage.stop(immediate) {
eprintln!("object_storage stop failed: {:#}", e);
}
for ps_conf in &env.pageservers {
let pageserver = PageServerNode::from_env(env, ps_conf);
if let Err(e) = pageserver.stop(immediate) {

View File

@@ -670,6 +670,7 @@ impl Endpoint {
reconfigure_concurrency: self.reconfigure_concurrency,
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
audit_log_level: ComputeAudit::Disabled,
logs_export_host: None::<String>,
};
// this strange code is needed to support respec() in tests

View File

@@ -10,6 +10,7 @@ mod background_process;
pub mod broker;
pub mod endpoint;
pub mod local_env;
pub mod object_storage;
pub mod pageserver;
pub mod postgresql_conf;
pub mod safekeeper;

View File

@@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize};
use utils::auth::{Claims, encode_from_key_file};
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::object_storage::{OBJECT_STORAGE_REMOTE_STORAGE_DIR, ObjectStorage};
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::safekeeper::SafekeeperNode;
@@ -55,6 +56,7 @@ pub struct LocalEnv {
// used to issue tokens during e.g pg start
pub private_key_path: PathBuf,
pub public_key_path: PathBuf,
pub broker: NeonBroker,
@@ -68,6 +70,8 @@ pub struct LocalEnv {
pub safekeepers: Vec<SafekeeperConf>,
pub object_storage: ObjectStorageConf,
// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
// be propagated into each pageserver's configuration.
pub control_plane_api: Url,
@@ -95,6 +99,7 @@ pub struct OnDiskConfig {
pub neon_distrib_dir: PathBuf,
pub default_tenant_id: Option<TenantId>,
pub private_key_path: PathBuf,
pub public_key_path: PathBuf,
pub broker: NeonBroker,
pub storage_controller: NeonStorageControllerConf,
#[serde(
@@ -103,6 +108,7 @@ pub struct OnDiskConfig {
)]
pub pageservers: Vec<PageServerConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub object_storage: ObjectStorageConf,
pub control_plane_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
pub control_plane_compute_hook_api: Option<Url>,
@@ -136,11 +142,18 @@ pub struct NeonLocalInitConf {
pub storage_controller: Option<NeonStorageControllerConf>,
pub pageservers: Vec<NeonLocalInitPageserverConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub object_storage: ObjectStorageConf,
pub control_plane_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
pub generate_local_ssl_certs: bool,
}
#[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
pub struct ObjectStorageConf {
pub port: u16,
}
/// Broker config for cluster internal communication.
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
@@ -398,6 +411,10 @@ impl LocalEnv {
self.pg_dir(pg_version, "lib")
}
pub fn object_storage_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("object_storage")
}
pub fn pageserver_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("pageserver")
}
@@ -431,6 +448,10 @@ impl LocalEnv {
self.base_data_dir.join("safekeepers").join(data_dir_name)
}
pub fn object_storage_data_dir(&self) -> PathBuf {
self.base_data_dir.join("object_storage")
}
pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> {
if let Some(conf) = self.pageservers.iter().find(|node| node.id == id) {
Ok(conf)
@@ -582,6 +603,7 @@ impl LocalEnv {
neon_distrib_dir,
default_tenant_id,
private_key_path,
public_key_path,
broker,
storage_controller,
pageservers,
@@ -591,6 +613,7 @@ impl LocalEnv {
control_plane_compute_hook_api: _,
branch_name_mappings,
generate_local_ssl_certs,
object_storage,
} = on_disk_config;
LocalEnv {
base_data_dir: repopath.to_owned(),
@@ -598,6 +621,7 @@ impl LocalEnv {
neon_distrib_dir,
default_tenant_id,
private_key_path,
public_key_path,
broker,
storage_controller,
pageservers,
@@ -606,6 +630,7 @@ impl LocalEnv {
control_plane_hooks_api,
branch_name_mappings,
generate_local_ssl_certs,
object_storage,
}
};
@@ -705,6 +730,7 @@ impl LocalEnv {
neon_distrib_dir: self.neon_distrib_dir.clone(),
default_tenant_id: self.default_tenant_id,
private_key_path: self.private_key_path.clone(),
public_key_path: self.public_key_path.clone(),
broker: self.broker.clone(),
storage_controller: self.storage_controller.clone(),
pageservers: vec![], // it's skip_serializing anyway
@@ -714,6 +740,7 @@ impl LocalEnv {
control_plane_compute_hook_api: None,
branch_name_mappings: self.branch_name_mappings.clone(),
generate_local_ssl_certs: self.generate_local_ssl_certs,
object_storage: self.object_storage.clone(),
},
)
}
@@ -797,6 +824,7 @@ impl LocalEnv {
control_plane_api,
generate_local_ssl_certs,
control_plane_hooks_api,
object_storage,
} = conf;
// Find postgres binaries.
@@ -828,6 +856,7 @@ impl LocalEnv {
)
.context("generate auth keys")?;
let private_key_path = PathBuf::from("auth_private_key.pem");
let public_key_path = PathBuf::from("auth_public_key.pem");
// create the runtime type because the remaining initialization code below needs
// a LocalEnv instance op operation
@@ -838,6 +867,7 @@ impl LocalEnv {
neon_distrib_dir,
default_tenant_id: Some(default_tenant_id),
private_key_path,
public_key_path,
broker,
storage_controller: storage_controller.unwrap_or_default(),
pageservers: pageservers.iter().map(Into::into).collect(),
@@ -846,6 +876,7 @@ impl LocalEnv {
control_plane_hooks_api,
branch_name_mappings: Default::default(),
generate_local_ssl_certs,
object_storage,
};
if generate_local_ssl_certs {
@@ -873,8 +904,13 @@ impl LocalEnv {
.context("pageserver init failed")?;
}
ObjectStorage::from_env(&env)
.init()
.context("object storage init failed")?;
// setup remote remote location for default LocalFs remote storage
std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?;
std::fs::create_dir_all(env.base_data_dir.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR))?;
env.persist_config()
}

View File

@@ -0,0 +1,107 @@
use crate::background_process::{self, start_process, stop_process};
use crate::local_env::LocalEnv;
use anyhow::anyhow;
use anyhow::{Context, Result};
use camino::Utf8PathBuf;
use std::io::Write;
use std::time::Duration;
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const OBJECT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/object_storage";
pub const OBJECT_STORAGE_DEFAULT_PORT: u16 = 9993;
pub struct ObjectStorage {
pub bin: Utf8PathBuf,
pub data_dir: Utf8PathBuf,
pub pemfile: Utf8PathBuf,
pub port: u16,
}
impl ObjectStorage {
pub fn from_env(env: &LocalEnv) -> ObjectStorage {
ObjectStorage {
bin: Utf8PathBuf::from_path_buf(env.object_storage_bin()).unwrap(),
data_dir: Utf8PathBuf::from_path_buf(env.object_storage_data_dir()).unwrap(),
pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
port: env.object_storage.port,
}
}
fn config_path(&self) -> Utf8PathBuf {
self.data_dir.join("object_storage.json")
}
fn listen_addr(&self) -> Utf8PathBuf {
format!("127.0.0.1:{}", self.port).into()
}
pub fn init(&self) -> Result<()> {
println!("Initializing object storage in {:?}", self.data_dir);
let parent = self.data_dir.parent().unwrap();
#[derive(serde::Serialize)]
struct Cfg {
listen: Utf8PathBuf,
pemfile: Utf8PathBuf,
local_path: Utf8PathBuf,
r#type: String,
}
let cfg = Cfg {
listen: self.listen_addr(),
pemfile: parent.join(self.pemfile.clone()),
local_path: parent.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR),
r#type: "LocalFs".to_string(),
};
std::fs::create_dir_all(self.config_path().parent().unwrap())?;
std::fs::write(self.config_path(), serde_json::to_string(&cfg)?)
.context("write object storage config")?;
Ok(())
}
pub async fn start(&self, retry_timeout: &Duration) -> Result<()> {
println!("Starting s3 proxy at {}", self.listen_addr());
std::io::stdout().flush().context("flush stdout")?;
let process_status_check = || async {
tokio::time::sleep(Duration::from_millis(500)).await;
let res = reqwest::Client::new()
.get(format!("http://{}/metrics", self.listen_addr()))
.send()
.await;
match res {
Ok(response) if response.status().is_success() => Ok(true),
Ok(_) => Err(anyhow!("Failed to query /metrics")),
Err(e) => Err(anyhow!("Failed to check node status: {e}")),
}
};
let res = start_process(
"object_storage",
&self.data_dir.clone().into_std_path_buf(),
&self.bin.clone().into_std_path_buf(),
vec![self.config_path().to_string()],
vec![("RUST_LOG".into(), "debug".into())],
background_process::InitialPidFile::Create(self.pid_file()),
retry_timeout,
process_status_check,
)
.await;
if res.is_err() {
eprintln!("Logs:\n{}", std::fs::read_to_string(self.log_file())?);
}
res
}
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
stop_process(immediate, "object_storage", &self.pid_file())
}
fn log_file(&self) -> Utf8PathBuf {
self.data_dir.join("object_storage.log")
}
fn pid_file(&self) -> Utf8PathBuf {
self.data_dir.join("object_storage.pid")
}
}

View File

@@ -151,7 +151,7 @@ Example body:
```
{
"tenant_id": "1f359dd625e519a1a4e8d7509690f6fc",
"stripe_size": 32768,
"stripe_size": 2048,
"shards": [
{"node_id": 344, "shard_number": 0},
{"node_id": 722, "shard_number": 1},

View File

@@ -30,9 +30,3 @@ pub struct SetRoleGrantsRequest {
pub privileges: Vec<Privilege>,
pub role: PgIdent,
}
/// Request of the /configure_telemetry API
#[derive(Debug, Deserialize, Serialize)]
pub struct ConfigureTelemetryRequest {
pub logs_export_host: Option<String>,
}

View File

@@ -168,6 +168,10 @@ pub struct ComputeSpec {
/// Extensions should be present in shared_preload_libraries
#[serde(default)]
pub audit_log_level: ComputeAudit,
/// Hostname and the port of the otel collector. Leave empty to disable Postgres logs forwarding.
/// Example: config-shy-breeze-123-collector-monitoring.neon-telemetry.svc.cluster.local:10514
pub logs_export_host: Option<String>,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
@@ -179,9 +183,6 @@ pub enum ComputeFeature {
/// track short-lived connections as user activity.
ActivityMonitorExperimental,
/// Allow to configure rsyslog for Postgres logs export
PostgresLogsExport,
/// This is a special feature flag that is used to represent unknown feature flags.
/// Basically all unknown to enum flags are represented as this one. See unit test
/// `parse_unknown_features()` for more details.

View File

@@ -613,8 +613,7 @@ mod tests {
use rand::{RngCore, SeedableRng};
use super::*;
use crate::models::ShardParameters;
use crate::shard::{ShardCount, ShardNumber};
use crate::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber, ShardStripeSize};
// Helper function to create a key range.
//
@@ -964,12 +963,8 @@ mod tests {
}
#[test]
fn sharded_range_relation_gap() {
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
let shard_identity =
ShardIdentity::new(ShardNumber(0), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
let range = ShardedRange::new(
Range {
@@ -985,12 +980,8 @@ mod tests {
#[test]
fn shard_identity_keyspaces_single_key() {
let shard_identity = ShardIdentity::new(
ShardNumber(1),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
let shard_identity =
ShardIdentity::new(ShardNumber(1), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
let range = ShardedRange::new(
Range {
@@ -1034,12 +1025,8 @@ mod tests {
#[test]
fn shard_identity_keyspaces_forkno_gap() {
let shard_identity = ShardIdentity::new(
ShardNumber(1),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
let shard_identity =
ShardIdentity::new(ShardNumber(1), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
let range = ShardedRange::new(
Range {
@@ -1061,7 +1048,7 @@ mod tests {
let shard_identity = ShardIdentity::new(
ShardNumber(shard_number),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
DEFAULT_STRIPE_SIZE,
)
.unwrap();
@@ -1144,37 +1131,44 @@ mod tests {
/// for a single tenant.
#[test]
fn sharded_range_fragment_simple() {
const SHARD_COUNT: u8 = 4;
const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
ShardCount::new(SHARD_COUNT),
ShardStripeSize(STRIPE_SIZE),
)
.unwrap();
// A range which we happen to know covers exactly one stripe which belongs to this shard
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap();
let mut input_end = input_start;
input_end.field6 += STRIPE_SIZE; // field6 is block number
// Ask for stripe_size blocks, we get the whole stripe
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 32768),
(32768, vec![(32768, input_start..input_end)])
do_fragment(input_start, input_end, &shard_identity, STRIPE_SIZE),
(STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
);
// Ask for more, we still get the whole stripe
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 10000000),
(32768, vec![(32768, input_start..input_end)])
do_fragment(input_start, input_end, &shard_identity, 10 * STRIPE_SIZE),
(STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
);
// Ask for target_nblocks of half the stripe size, we get two halves
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 16384),
do_fragment(input_start, input_end, &shard_identity, STRIPE_SIZE / 2),
(
32768,
STRIPE_SIZE,
vec![
(16384, input_start..input_start.add(16384)),
(16384, input_start.add(16384)..input_end)
(
STRIPE_SIZE / 2,
input_start..input_start.add(STRIPE_SIZE / 2)
),
(STRIPE_SIZE / 2, input_start.add(STRIPE_SIZE / 2)..input_end)
]
)
);
@@ -1182,40 +1176,53 @@ mod tests {
#[test]
fn sharded_range_fragment_multi_stripe() {
const SHARD_COUNT: u8 = 4;
const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
const RANGE_SIZE: u32 = SHARD_COUNT as u32 * STRIPE_SIZE;
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
ShardCount::new(SHARD_COUNT),
ShardStripeSize(STRIPE_SIZE),
)
.unwrap();
// A range which covers multiple stripes, exactly one of which belongs to the current shard.
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
let mut input_end = input_start;
input_end.field6 += RANGE_SIZE; // field6 is block number
// Ask for all the blocks, get a fragment that covers the whole range but reports
// its size to be just the blocks belonging to our shard.
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 131072),
(32768, vec![(32768, input_start..input_end)])
do_fragment(input_start, input_end, &shard_identity, RANGE_SIZE),
(STRIPE_SIZE, vec![(STRIPE_SIZE, input_start..input_end)])
);
// Ask for a sub-stripe quantity
// Ask for a sub-stripe quantity that results in 3 fragments.
let limit = STRIPE_SIZE / 3 + 1;
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 16000),
do_fragment(input_start, input_end, &shard_identity, limit),
(
32768,
STRIPE_SIZE,
vec![
(16000, input_start..input_start.add(16000)),
(16000, input_start.add(16000)..input_start.add(32000)),
(768, input_start.add(32000)..input_end),
(limit, input_start..input_start.add(limit)),
(limit, input_start.add(limit)..input_start.add(2 * limit)),
(
STRIPE_SIZE - 2 * limit,
input_start.add(2 * limit)..input_end
),
]
)
);
// Try on a range that starts slightly after our owned stripe
assert_eq!(
do_fragment(input_start.add(1), input_end, &shard_identity, 131072),
(32767, vec![(32767, input_start.add(1)..input_end)])
do_fragment(input_start.add(1), input_end, &shard_identity, RANGE_SIZE),
(
STRIPE_SIZE - 1,
vec![(STRIPE_SIZE - 1, input_start.add(1)..input_end)]
)
);
}
@@ -1223,32 +1230,40 @@ mod tests {
/// a previous relation.
#[test]
fn sharded_range_fragment_starting_from_logical_size() {
const SHARD_COUNT: u8 = 4;
const STRIPE_SIZE: u32 = DEFAULT_STRIPE_SIZE.0;
const RANGE_SIZE: u32 = SHARD_COUNT as u32 * STRIPE_SIZE;
let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap();
let input_end = Key::from_hex("000000067f00000001000000ae0100008000").unwrap();
let mut input_end = Key::from_hex("000000067f00000001000000ae0100000000").unwrap();
input_end.field6 += RANGE_SIZE; // field6 is block number
// Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
ShardCount::new(SHARD_COUNT),
ShardStripeSize(STRIPE_SIZE),
)
.unwrap();
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 0x10000),
(0x8001, vec![(0x8001, input_start..input_end)])
do_fragment(input_start, input_end, &shard_identity, 2 * STRIPE_SIZE),
(
STRIPE_SIZE + 1,
vec![(STRIPE_SIZE + 1, input_start..input_end)]
)
);
// Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards
// store all logical sizes)
let shard_identity = ShardIdentity::new(
ShardNumber(1),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
ShardCount::new(SHARD_COUNT),
ShardStripeSize(STRIPE_SIZE),
)
.unwrap();
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 0x10000),
(0x1, vec![(0x1, input_start..input_end)])
do_fragment(input_start, input_end, &shard_identity, 2 * STRIPE_SIZE),
(1, vec![(1, input_start..input_end)])
);
}
@@ -1284,12 +1299,8 @@ mod tests {
);
// Same, but using a sharded identity
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
let shard_identity =
ShardIdentity::new(ShardNumber(0), ShardCount::new(4), DEFAULT_STRIPE_SIZE).unwrap();
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 0x8000),
(u32::MAX, vec![(u32::MAX, input_start..input_end),])
@@ -1331,7 +1342,7 @@ mod tests {
ShardIdentity::new(
ShardNumber((prng.next_u32() % shard_count) as u8),
ShardCount::new(shard_count as u8),
ShardParameters::DEFAULT_STRIPE_SIZE,
DEFAULT_STRIPE_SIZE,
)
.unwrap()
};

View File

@@ -26,7 +26,7 @@ use utils::{completion, serde_system_time};
use crate::config::Ratio;
use crate::key::{CompactKey, Key};
use crate::reltag::RelTag;
use crate::shard::{ShardCount, ShardStripeSize, TenantShardId};
use crate::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
/// The state of a tenant in this pageserver.
///
@@ -80,10 +80,22 @@ pub enum TenantState {
///
/// Transitions out of this state are possible through `set_broken()`.
Stopping {
/// The barrier can be used to wait for shutdown to complete. The first caller to set
/// Some(Barrier) is responsible for driving shutdown to completion. Subsequent callers
/// will wait for the first caller's existing barrier.
///
/// None is set when an attach is cancelled, to signal to shutdown that the attach has in
/// fact cancelled:
///
/// 1. `shutdown` sees `TenantState::Attaching`, and cancels the tenant.
/// 2. `attach` sets `TenantState::Stopping(None)` and exits.
/// 3. `set_stopping` waits for `TenantState::Stopping(None)` and sets
/// `TenantState::Stopping(Some)` to claim the barrier as the shutdown owner.
//
// Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
// otherwise it will not be skipped during deserialization
#[serde(skip)]
progress: completion::Barrier,
progress: Option<completion::Barrier>,
},
/// The tenant is recognized by the pageserver, but can no longer be used for
/// any operations.
@@ -426,8 +438,6 @@ pub struct ShardParameters {
}
impl ShardParameters {
pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
pub fn is_unsharded(&self) -> bool {
self.count.is_unsharded()
}
@@ -437,7 +447,7 @@ impl Default for ShardParameters {
fn default() -> Self {
Self {
count: ShardCount::new(0),
stripe_size: Self::DEFAULT_STRIPE_SIZE,
stripe_size: DEFAULT_STRIPE_SIZE,
}
}
}
@@ -1668,6 +1678,7 @@ pub struct SecondaryProgress {
pub struct TenantScanRemoteStorageShard {
pub tenant_shard_id: TenantShardId,
pub generation: Option<u32>,
pub stripe_size: Option<ShardStripeSize>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
@@ -2719,10 +2730,15 @@ mod tests {
"Activating",
),
(line!(), TenantState::Active, "Active"),
(
line!(),
TenantState::Stopping { progress: None },
"Stopping",
),
(
line!(),
TenantState::Stopping {
progress: utils::completion::Barrier::default(),
progress: Some(completion::Barrier::default()),
},
"Stopping",
),

View File

@@ -58,6 +58,8 @@ pub enum NeonWalRecord {
/// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and
/// its references in `timeline.rs`.
will_init: bool,
/// Only append the record if the current image is the same as the one specified in this field.
only_if: Option<String>,
},
}
@@ -81,6 +83,17 @@ impl NeonWalRecord {
append: s.as_ref().to_string(),
clear: false,
will_init: false,
only_if: None,
}
}
#[cfg(feature = "testing")]
pub fn wal_append_conditional(s: impl AsRef<str>, only_if: impl AsRef<str>) -> Self {
Self::Test {
append: s.as_ref().to_string(),
clear: false,
will_init: false,
only_if: Some(only_if.as_ref().to_string()),
}
}
@@ -90,6 +103,7 @@ impl NeonWalRecord {
append: s.as_ref().to_string(),
clear: true,
will_init: false,
only_if: None,
}
}
@@ -99,6 +113,7 @@ impl NeonWalRecord {
append: s.as_ref().to_string(),
clear: true,
will_init: true,
only_if: None,
}
}
}

View File

@@ -78,6 +78,12 @@ impl Default for ShardStripeSize {
}
}
impl std::fmt::Display for ShardStripeSize {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
/// Layout version: for future upgrades where we might change how the key->shard mapping works
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash, Debug)]
pub struct ShardLayout(u8);
@@ -86,8 +92,11 @@ const LAYOUT_V1: ShardLayout = ShardLayout(1);
/// ShardIdentity uses a magic layout value to indicate if it is unusable
const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
/// Default stripe size in pages: 256MiB divided by 8kiB page size.
const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
/// The default stripe size in pages. 16 MiB divided by 8 kiB page size.
///
/// A lower stripe size distributes ingest load better across shards, but reduces IO amortization.
/// 16 MiB appears to be a reasonable balance: <https://github.com/neondatabase/neon/pull/10510>.
pub const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(16 * 1024 / 8);
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub enum ShardConfigError {
@@ -537,7 +546,7 @@ mod tests {
field6: 0x7d06,
};
let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
let shard = key_to_shard_number(ShardCount(10), ShardStripeSize(32768), &key);
assert_eq!(shard, ShardNumber(8));
}

View File

@@ -28,7 +28,7 @@ toml_edit.workspace = true
tracing.workspace = true
scopeguard.workspace = true
metrics.workspace = true
utils.workspace = true
utils = { path = "../utils", default-features = false }
pin-project-lite.workspace = true
azure_core.workspace = true

View File

@@ -5,7 +5,8 @@ edition.workspace = true
license.workspace = true
[features]
default = []
default = ["rename_noreplace"]
rename_noreplace = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
@@ -35,7 +36,7 @@ serde_with.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio = { workspace = true, features = ["signal"] }
tokio-tar.workspace = true
tokio-util.workspace = true
toml_edit = { workspace = true, features = ["serde"] }

View File

@@ -3,7 +3,9 @@ use std::{fs, io, path::Path};
use anyhow::Context;
#[cfg(feature = "rename_noreplace")]
mod rename_noreplace;
#[cfg(feature = "rename_noreplace")]
pub use rename_noreplace::rename_noreplace;
pub trait PathExt {

View File

@@ -8,7 +8,7 @@ pub fn rename_noreplace<P1: ?Sized + NixPath, P2: ?Sized + NixPath>(
dst: &P2,
) -> nix::Result<()> {
{
#[cfg(target_os = "linux")]
#[cfg(all(target_os = "linux", target_env = "gnu"))]
{
nix::fcntl::renameat2(
None,
@@ -29,7 +29,7 @@ pub fn rename_noreplace<P1: ?Sized + NixPath, P2: ?Sized + NixPath>(
})??;
nix::errno::Errno::result(res).map(drop)
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
#[cfg(not(any(all(target_os = "linux", target_env = "gnu"), target_os = "macos")))]
{
std::compile_error!("OS does not support no-replace renames");
}

View File

@@ -1,6 +1,8 @@
pub use signal_hook::consts::TERM_SIGNALS;
pub use signal_hook::consts::signal::*;
use signal_hook::iterator::Signals;
use tokio::signal::unix::{SignalKind, signal};
use tracing::info;
pub enum Signal {
Quit,
@@ -36,3 +38,30 @@ impl ShutdownSignals {
Ok(())
}
}
/// Runs in a loop since we want to be responsive to multiple signals
/// even after triggering shutdown (e.g. a SIGQUIT after a slow SIGTERM shutdown)
/// <https://github.com/neondatabase/neon/issues/9740>
pub async fn signal_handler(token: tokio_util::sync::CancellationToken) {
let mut sigint = signal(SignalKind::interrupt()).unwrap();
let mut sigterm = signal(SignalKind::terminate()).unwrap();
let mut sigquit = signal(SignalKind::quit()).unwrap();
loop {
let signal = tokio::select! {
_ = sigquit.recv() => {
info!("Got signal SIGQUIT. Terminating in immediate shutdown mode.");
std::process::exit(111);
}
_ = sigint.recv() => "SIGINT",
_ = sigterm.recv() => "SIGTERM",
};
if !token.is_cancelled() {
info!("Got signal {signal}. Terminating gracefully in fast shutdown mode.");
token.cancel();
} else {
info!("Got signal {signal}. Already shutting down.");
}
}
}

28
object_storage/Cargo.toml Normal file
View File

@@ -0,0 +1,28 @@
[package]
name = "object_storage"
version = "0.0.1"
edition.workspace = true
license.workspace = true
[dependencies]
anyhow.workspace = true
axum-extra.workspace = true
axum.workspace = true
camino.workspace = true
futures.workspace = true
jsonwebtoken.workspace = true
prometheus.workspace = true
remote_storage.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tracing.workspace = true
utils = { path = "../libs/utils", default-features = false }
workspace_hack.workspace = true
[dev-dependencies]
camino-tempfile.workspace = true
http-body-util.workspace = true
itertools.workspace = true
rand.workspace = true
test-log.workspace = true
tower.workspace = true

561
object_storage/src/app.rs Normal file
View File

@@ -0,0 +1,561 @@
use anyhow::anyhow;
use axum::body::{Body, Bytes};
use axum::response::{IntoResponse, Response};
use axum::{Router, http::StatusCode};
use object_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
use remote_storage::TimeoutOrCancel;
use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath};
use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use utils::backoff::retry;
pub fn app(state: Arc<Storage>) -> Router<()> {
use axum::routing::{delete as _delete, get as _get};
let delete_prefix = _delete(delete_prefix);
Router::new()
.route(
"/{tenant_id}/{timeline_id}/{endpoint_id}/{*path}",
_get(get).put(set).delete(delete),
)
.route(
"/{tenant_id}/{timeline_id}/{endpoint_id}",
delete_prefix.clone(),
)
.route("/{tenant_id}/{timeline_id}", delete_prefix.clone())
.route("/{tenant_id}", delete_prefix)
.route("/metrics", _get(metrics))
.route("/status", _get(async || StatusCode::OK.into_response()))
.with_state(state)
}
type Result = anyhow::Result<Response, Response>;
type State = axum::extract::State<Arc<Storage>>;
const CONTENT_TYPE: &str = "content-type";
const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
const WARN_THRESHOLD: u32 = 3;
const MAX_RETRIES: u32 = 10;
async fn metrics() -> Result {
prometheus::TextEncoder::new()
.encode_to_string(&prometheus::gather())
.map(|s| s.into_response())
.map_err(|e| internal_error(e, "/metrics", "collecting metrics"))
}
async fn get(S3Path { path }: S3Path, state: State) -> Result {
info!(%path, "downloading");
let download_err = |e| {
if let DownloadError::NotFound = e {
info!(%path, %e, "downloading"); // 404 is not an issue of _this_ service
return not_found(&path);
}
internal_error(e, &path, "downloading")
};
let cancel = state.cancel.clone();
let opts = &DownloadOpts::default();
let stream = retry(
async || state.storage.download(&path, opts, &cancel).await,
DownloadError::is_permanent,
WARN_THRESHOLD,
MAX_RETRIES,
"downloading",
&cancel,
)
.await
.unwrap_or(Err(DownloadError::Cancelled))
.map_err(download_err)?
.download_stream;
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
.body(Body::from_stream(stream))
.map_err(|e| internal_error(e, path, "reading response"))
}
// Best solution for files is multipart upload, but remote_storage doesn't support it,
// so we can either read Bytes in memory and push at once or forward BodyDataStream to
// remote_storage. The latter may seem more peformant, but BodyDataStream doesn't have a
// guaranteed size() which may produce issues while uploading to s3.
// So, currently we're going with an in-memory copy plus a boundary to prevent uploading
// very large files.
async fn set(S3Path { path }: S3Path, state: State, bytes: Bytes) -> Result {
info!(%path, "uploading");
let request_len = bytes.len();
let max_len = state.max_upload_file_limit;
if request_len > max_len {
return Err(bad_request(
anyhow!("File size {request_len} exceeds max {max_len}"),
"uploading",
));
}
let cancel = state.cancel.clone();
let fun = async || {
let stream = bytes_to_stream(bytes.clone());
state
.storage
.upload(stream, request_len, &path, None, &cancel)
.await
};
retry(
fun,
TimeoutOrCancel::caused_by_cancel,
WARN_THRESHOLD,
MAX_RETRIES,
"uploading",
&cancel,
)
.await
.unwrap_or(Err(anyhow!("uploading cancelled")))
.map_err(|e| internal_error(e, path, "reading response"))?;
Ok(ok())
}
async fn delete(S3Path { path }: S3Path, state: State) -> Result {
info!(%path, "deleting");
let cancel = state.cancel.clone();
retry(
async || state.storage.delete(&path, &cancel).await,
TimeoutOrCancel::caused_by_cancel,
WARN_THRESHOLD,
MAX_RETRIES,
"deleting",
&cancel,
)
.await
.unwrap_or(Err(anyhow!("deleting cancelled")))
.map_err(|e| internal_error(e, path, "deleting"))?;
Ok(ok())
}
async fn delete_prefix(PrefixS3Path { path }: PrefixS3Path, state: State) -> Result {
info!(%path, "deleting prefix");
let cancel = state.cancel.clone();
retry(
async || state.storage.delete_prefix(&path, &cancel).await,
TimeoutOrCancel::caused_by_cancel,
WARN_THRESHOLD,
MAX_RETRIES,
"deleting prefix",
&cancel,
)
.await
.unwrap_or(Err(anyhow!("deleting prefix cancelled")))
.map_err(|e| internal_error(e, path, "deleting prefix"))?;
Ok(ok())
}
pub async fn check_storage_permissions(
client: &GenericRemoteStorage,
cancel: CancellationToken,
) -> anyhow::Result<()> {
info!("storage permissions check");
// as_nanos() as multiple instances proxying same bucket may be started at once
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)?
.as_nanos()
.to_string();
let path = RemotePath::from_string(&format!("write_access_{now}"))?;
info!(%path, "uploading");
let body = now.to_string();
let stream = bytes_to_stream(Bytes::from(body.clone()));
client
.upload(stream, body.len(), &path, None, &cancel)
.await?;
use tokio::io::AsyncReadExt;
info!(%path, "downloading");
let download_opts = DownloadOpts {
kind: remote_storage::DownloadKind::Small,
..Default::default()
};
let mut body_read_buf = Vec::new();
let stream = client
.download(&path, &download_opts, &cancel)
.await?
.download_stream;
tokio_util::io::StreamReader::new(stream)
.read_to_end(&mut body_read_buf)
.await?;
let body_read = String::from_utf8(body_read_buf)?;
if body != body_read {
error!(%body, %body_read, "File contents do not match");
anyhow::bail!("Read back file doesn't match original")
}
info!(%path, "removing");
client.delete(&path, &cancel).await
}
fn bytes_to_stream(bytes: Bytes) -> impl futures::Stream<Item = std::io::Result<Bytes>> {
futures::stream::once(futures::future::ready(Ok(bytes)))
}
#[cfg(test)]
mod tests {
use super::*;
use axum::{body::Body, extract::Request, response::Response};
use http_body_util::BodyExt;
use itertools::iproduct;
use std::env::var;
use std::sync::Arc;
use std::time::Duration;
use test_log::test as testlog;
use tower::{Service, util::ServiceExt};
use utils::id::{TenantId, TimelineId};
// see libs/remote_storage/tests/test_real_s3.rs
const REAL_S3_ENV: &str = "ENABLE_REAL_S3_REMOTE_STORAGE";
const REAL_S3_BUCKET: &str = "REMOTE_STORAGE_S3_BUCKET";
const REAL_S3_REGION: &str = "REMOTE_STORAGE_S3_REGION";
async fn proxy() -> (Storage, Option<camino_tempfile::Utf8TempDir>) {
let cancel = CancellationToken::new();
let (dir, storage) = if var(REAL_S3_ENV).is_err() {
// tests execute in parallel and we need a new directory for each of them
let dir = camino_tempfile::tempdir().unwrap();
let fs =
remote_storage::LocalFs::new(dir.path().into(), Duration::from_secs(5)).unwrap();
(Some(dir), GenericRemoteStorage::LocalFs(fs))
} else {
// test_real_s3::create_s3_client is hard to reference, reimplementing here
let millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
use rand::Rng;
let random = rand::thread_rng().r#gen::<u32>();
let s3_config = remote_storage::S3Config {
bucket_name: var(REAL_S3_BUCKET).unwrap(),
bucket_region: var(REAL_S3_REGION).unwrap(),
prefix_in_bucket: Some(format!("test_{millis}_{random:08x}/")),
endpoint: None,
concurrency_limit: std::num::NonZeroUsize::new(100).unwrap(),
max_keys_per_list_response: None,
upload_storage_class: None,
};
let bucket = remote_storage::S3Bucket::new(&s3_config, Duration::from_secs(1))
.await
.unwrap();
(None, GenericRemoteStorage::AwsS3(Arc::new(bucket)))
};
let proxy = Storage {
auth: object_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
storage,
cancel: cancel.clone(),
max_upload_file_limit: usize::MAX,
};
check_storage_permissions(&proxy.storage, cancel)
.await
.unwrap();
(proxy, dir)
}
// see libs/utils/src/auth.rs
const TEST_PUB_KEY_ED25519: &[u8] = b"
-----BEGIN PUBLIC KEY-----
MCowBQYDK2VwAyEARYwaNBayR+eGI0iXB4s3QxE3Nl2g1iWbr6KtLWeVD/w=
-----END PUBLIC KEY-----
";
const TEST_PRIV_KEY_ED25519: &[u8] = br#"
-----BEGIN PRIVATE KEY-----
MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
-----END PRIVATE KEY-----
"#;
async fn request(req: Request<Body>) -> Response<Body> {
let (proxy, _) = proxy().await;
app(Arc::new(proxy))
.into_service()
.oneshot(req)
.await
.unwrap()
}
#[testlog(tokio::test)]
async fn status() {
let res = Request::builder()
.uri("/status")
.body(Body::empty())
.map(request)
.unwrap()
.await;
assert_eq!(res.status(), StatusCode::OK);
}
fn routes() -> impl Iterator<Item = (&'static str, &'static str)> {
iproduct!(
vec!["/1", "/1/2", "/1/2/3", "/1/2/3/4"],
vec!["GET", "PUT", "DELETE"]
)
}
#[testlog(tokio::test)]
async fn no_token() {
for (uri, method) in routes() {
info!(%uri, %method);
let res = Request::builder()
.uri(uri)
.method(method)
.body(Body::empty())
.map(request)
.unwrap()
.await;
assert!(matches!(
res.status(),
StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
));
}
}
#[testlog(tokio::test)]
async fn invalid_token() {
for (uri, method) in routes() {
info!(%uri, %method);
let status = Request::builder()
.uri(uri)
.header("Authorization", "Bearer 123")
.method(method)
.body(Body::empty())
.map(request)
.unwrap()
.await;
assert!(matches!(
status.status(),
StatusCode::METHOD_NOT_ALLOWED | StatusCode::BAD_REQUEST
));
}
}
const TENANT_ID: TenantId =
TenantId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]);
const TIMELINE_ID: TimelineId =
TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
fn token() -> String {
let claims = object_storage::Claims {
tenant_id: TENANT_ID,
timeline_id: TIMELINE_ID,
endpoint_id: ENDPOINT_ID.into(),
exp: u64::MAX,
};
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
jsonwebtoken::encode(&header, &claims, &key).unwrap()
}
#[testlog(tokio::test)]
async fn unauthorized() {
let (proxy, _) = proxy().await;
let mut app = app(Arc::new(proxy)).into_service();
let token = token();
let args = itertools::iproduct!(
vec![TENANT_ID.to_string(), TenantId::generate().to_string()],
vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()],
vec![ENDPOINT_ID, "ep-ololo"]
)
.skip(1);
for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) {
info!(%uri, %method, %tenant, %timeline, %endpoint);
let request = Request::builder()
.uri(format!("/{tenant}/{timeline}/{endpoint}/sub/path/key"))
.method(method)
.header("Authorization", format!("Bearer {}", token))
.body(Body::empty())
.unwrap();
let status = ServiceExt::ready(&mut app)
.await
.unwrap()
.call(request)
.await
.unwrap()
.status();
assert_eq!(status, StatusCode::UNAUTHORIZED);
}
}
#[testlog(tokio::test)]
async fn method_not_allowed() {
let token = token();
let iter = iproduct!(vec!["", "/.."], vec!["GET", "PUT"]);
for (key, method) in iter {
let status = Request::builder()
.uri(format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}{key}"))
.method(method)
.header("Authorization", format!("Bearer {token}"))
.body(Body::empty())
.map(request)
.unwrap()
.await
.status();
assert!(matches!(
status,
StatusCode::BAD_REQUEST | StatusCode::METHOD_NOT_ALLOWED
));
}
}
async fn requests_chain(
chain: impl Iterator<Item = (String, &str, &'static str, StatusCode, bool)>,
token: impl Fn(&str) -> String,
) {
let (proxy, _) = proxy().await;
let mut app = app(Arc::new(proxy)).into_service();
for (uri, method, body, expected_status, compare_body) in chain {
info!(%uri, %method, %body, %expected_status);
let bearer = format!("Bearer {}", token(&uri));
let request = Request::builder()
.uri(uri)
.method(method)
.header("Authorization", &bearer)
.body(Body::from(body))
.unwrap();
let response = ServiceExt::ready(&mut app)
.await
.unwrap()
.call(request)
.await
.unwrap();
assert_eq!(response.status(), expected_status);
if !compare_body {
continue;
}
let read_body = response.into_body().collect().await.unwrap().to_bytes();
assert_eq!(body, read_body);
}
}
#[testlog(tokio::test)]
async fn metrics() {
let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
let req = vec![
(uri.clone(), "PUT", "body", StatusCode::OK, false),
(uri.clone(), "DELETE", "", StatusCode::OK, false),
];
requests_chain(req.into_iter(), |_| token()).await;
let res = Request::builder()
.uri("/metrics")
.body(Body::empty())
.map(request)
.unwrap()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = res.into_body().collect().await.unwrap().to_bytes();
let body = String::from_utf8_lossy(&body);
tracing::debug!(%body);
// Storage metrics are not gathered for LocalFs
if var(REAL_S3_ENV).is_ok() {
assert!(body.contains("remote_storage_s3_deleted_objects_total"));
}
assert!(body.contains("process_threads"));
}
#[testlog(tokio::test)]
async fn insert_retrieve_remove() {
let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/key");
let chain = vec![
(uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
(uri.clone(), "PUT", "пыщьпыщь", StatusCode::OK, false),
(uri.clone(), "GET", "пыщьпыщь", StatusCode::OK, true),
(uri.clone(), "DELETE", "", StatusCode::OK, false),
(uri, "GET", "", StatusCode::NOT_FOUND, false),
];
requests_chain(chain.into_iter(), |_| token()).await;
}
fn delete_prefix_token(uri: &str) -> String {
use serde::Serialize;
let parts = uri.split("/").collect::<Vec<&str>>();
#[derive(Serialize)]
struct PrefixClaims {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
endpoint_id: Option<object_storage::EndpointId>,
exp: u64,
}
let claims = PrefixClaims {
tenant_id: parts.get(1).map(|c| c.parse().unwrap()).unwrap(),
timeline_id: parts.get(2).map(|c| c.parse().unwrap()),
endpoint_id: parts.get(3).map(ToString::to_string),
exp: u64::MAX,
};
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
jsonwebtoken::encode(&header, &claims, &key).unwrap()
}
// Can't use single digit numbers as they won't be validated as TimelineId and EndpointId
#[testlog(tokio::test)]
async fn delete_prefix() {
let tenant_id =
TenantId::from_array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).to_string();
let t2 = TimelineId::from_array([2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
let t3 = TimelineId::from_array([3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
let t4 = TimelineId::from_array([4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
let f = |timeline, path| format!("/{tenant_id}/{timeline}{path}");
// Why extra slash in string literals? Axum is weird with URIs:
// /1/2 and 1/2/ match different routes, thus first yields OK and second NOT_FOUND
// as it matches /tenant/timeline/endpoint, see https://stackoverflow.com/a/75355932
// The cost of removing trailing slash is suprisingly hard:
// * Add tower dependency with NormalizePath layer
// * wrap Router<()> in this layer https://github.com/tokio-rs/axum/discussions/2377
// * Rewrite make_service() -> into_make_service()
// * Rewrite oneshot() (not available for NormalizePath)
// I didn't manage to get it working correctly
let chain = vec![
// create 1/2/3/4, 1/2/3/5, delete prefix 1/2/3 -> empty
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false), // we can override file contents
(f(t2, "/3/5"), "PUT", "", StatusCode::OK, false),
(f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/3/5"), "GET", "", StatusCode::NOT_FOUND, false),
// create 1/2/3/4, 1/2/5/6, delete prefix 1/2/3 -> 1/2/5/6
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
(f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
(f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
// create 1/2/3/4, 1/2/7/8, delete prefix 1/2 -> empty
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
(f(t2, "/7/8"), "PUT", "", StatusCode::OK, false),
(f(t2, ""), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/7/8"), "GET", "", StatusCode::NOT_FOUND, false),
// create 1/2/3/4, 1/2/5/6, 1/3/8/9, delete prefix 1/2/3 -> 1/2/5/6, 1/3/8/9
(f(t2, "/3/4"), "PUT", "", StatusCode::OK, false),
(f(t2, "/5/6"), "PUT", "", StatusCode::OK, false),
(f(t3, "/8/9"), "PUT", "", StatusCode::OK, false),
(f(t2, "/3"), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/5/6"), "GET", "", StatusCode::OK, false),
(f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
// create 1/4/5/6, delete prefix 1/2 -> 1/3/8/9, 1/4/5/6
(f(t4, "/5/6"), "PUT", "", StatusCode::OK, false),
(f(t2, ""), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t3, "/8/9"), "GET", "", StatusCode::OK, false),
(f(t4, "/5/6"), "GET", "", StatusCode::OK, false),
// delete prefix 1 -> empty
(format!("/{tenant_id}"), "DELETE", "", StatusCode::OK, false),
(f(t2, "/3/4"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t2, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t3, "/8/9"), "GET", "", StatusCode::NOT_FOUND, false),
(f(t4, "/5/6"), "GET", "", StatusCode::NOT_FOUND, false),
];
requests_chain(chain.into_iter(), delete_prefix_token).await;
}
}

344
object_storage/src/lib.rs Normal file
View File

@@ -0,0 +1,344 @@
use anyhow::Result;
use axum::extract::{FromRequestParts, Path};
use axum::response::{IntoResponse, Response};
use axum::{RequestPartsExt, http::StatusCode, http::request::Parts};
use axum_extra::TypedHeader;
use axum_extra::headers::{Authorization, authorization::Bearer};
use camino::Utf8PathBuf;
use jsonwebtoken::{DecodingKey, Validation};
use remote_storage::{GenericRemoteStorage, RemotePath};
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::result::Result as StdResult;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use utils::id::{TenantId, TimelineId};
// simplified version of utils::auth::JwtAuth
pub struct JwtAuth {
decoding_key: DecodingKey,
validation: Validation,
}
pub const VALIDATION_ALGO: jsonwebtoken::Algorithm = jsonwebtoken::Algorithm::EdDSA;
impl JwtAuth {
pub fn new(key: &[u8]) -> Result<Self> {
Ok(Self {
decoding_key: DecodingKey::from_ed_pem(key)?,
validation: Validation::new(VALIDATION_ALGO),
})
}
pub fn decode<T: serde::de::DeserializeOwned>(&self, token: &str) -> Result<T> {
Ok(jsonwebtoken::decode(token, &self.decoding_key, &self.validation).map(|t| t.claims)?)
}
}
fn normalize_key(key: &str) -> StdResult<Utf8PathBuf, String> {
let key = clean_utf8(&Utf8PathBuf::from(key));
if key.starts_with("..") || key == "." || key == "/" {
return Err(format!("invalid key {key}"));
}
match key.strip_prefix("/").map(Utf8PathBuf::from) {
Ok(p) => Ok(p),
_ => Ok(key),
}
}
// Copied from path_clean crate with PathBuf->Utf8PathBuf
fn clean_utf8(path: &camino::Utf8Path) -> Utf8PathBuf {
use camino::Utf8Component as Comp;
let mut out = Vec::new();
for comp in path.components() {
match comp {
Comp::CurDir => (),
Comp::ParentDir => match out.last() {
Some(Comp::RootDir) => (),
Some(Comp::Normal(_)) => {
out.pop();
}
None | Some(Comp::CurDir) | Some(Comp::ParentDir) | Some(Comp::Prefix(_)) => {
out.push(comp)
}
},
comp => out.push(comp),
}
}
if !out.is_empty() {
out.iter().collect()
} else {
Utf8PathBuf::from(".")
}
}
pub struct Storage {
pub auth: JwtAuth,
pub storage: GenericRemoteStorage,
pub cancel: CancellationToken,
pub max_upload_file_limit: usize,
}
pub type EndpointId = String; // If needed, reuse small string from proxy/src/types.rc
#[derive(Deserialize, Serialize, PartialEq)]
pub struct Claims {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub endpoint_id: EndpointId,
pub exp: u64,
}
impl Display for Claims {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Claims(tenant_id {} timeline_id {} endpoint_id {} exp {})",
self.tenant_id, self.timeline_id, self.endpoint_id, self.exp
)
}
}
#[derive(Deserialize, Serialize)]
struct KeyRequest {
tenant_id: TenantId,
timeline_id: TimelineId,
endpoint_id: EndpointId,
path: String,
}
#[derive(Debug, PartialEq)]
pub struct S3Path {
pub path: RemotePath,
}
impl TryFrom<&KeyRequest> for S3Path {
type Error = String;
fn try_from(req: &KeyRequest) -> StdResult<Self, Self::Error> {
let KeyRequest {
tenant_id,
timeline_id,
endpoint_id,
path,
} = &req;
let prefix = format!("{tenant_id}/{timeline_id}/{endpoint_id}",);
let path = Utf8PathBuf::from(prefix).join(normalize_key(path)?);
let path = RemotePath::new(&path).unwrap(); // unwrap() because the path is already relative
Ok(S3Path { path })
}
}
fn unauthorized(route: impl Display, claims: impl Display) -> Response {
debug!(%route, %claims, "route doesn't match claims");
StatusCode::UNAUTHORIZED.into_response()
}
pub fn bad_request(err: impl Display, desc: &'static str) -> Response {
debug!(%err, desc);
(StatusCode::BAD_REQUEST, err.to_string()).into_response()
}
pub fn ok() -> Response {
StatusCode::OK.into_response()
}
pub fn internal_error(err: impl Display, path: impl Display, desc: &'static str) -> Response {
error!(%err, %path, desc);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
pub fn not_found(key: impl ToString) -> Response {
(StatusCode::NOT_FOUND, key.to_string()).into_response()
}
impl FromRequestParts<Arc<Storage>> for S3Path {
type Rejection = Response;
async fn from_request_parts(
parts: &mut Parts,
state: &Arc<Storage>,
) -> Result<Self, Self::Rejection> {
let Path(path): Path<KeyRequest> = parts
.extract()
.await
.map_err(|e| bad_request(e, "invalid route"))?;
let TypedHeader(Authorization(bearer)) = parts
.extract::<TypedHeader<Authorization<Bearer>>>()
.await
.map_err(|e| bad_request(e, "invalid token"))?;
let claims: Claims = state
.auth
.decode(bearer.token())
.map_err(|e| bad_request(e, "decoding token"))?;
let route = Claims {
tenant_id: path.tenant_id,
timeline_id: path.timeline_id,
endpoint_id: path.endpoint_id.clone(),
exp: claims.exp,
};
if route != claims {
return Err(unauthorized(route, claims));
}
(&path)
.try_into()
.map_err(|e| bad_request(e, "invalid route"))
}
}
#[derive(Deserialize, Serialize, PartialEq)]
pub struct PrefixKeyPath {
pub tenant_id: TenantId,
pub timeline_id: Option<TimelineId>,
pub endpoint_id: Option<EndpointId>,
}
impl Display for PrefixKeyPath {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"PrefixKeyPath(tenant_id {} timeline_id {} endpoint_id {})",
self.tenant_id,
self.timeline_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string()),
self.endpoint_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string())
)
}
}
#[derive(Debug, PartialEq)]
pub struct PrefixS3Path {
pub path: RemotePath,
}
impl From<&PrefixKeyPath> for PrefixS3Path {
fn from(path: &PrefixKeyPath) -> Self {
let timeline_id = path
.timeline_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string());
let endpoint_id = path
.endpoint_id
.as_ref()
.map(ToString::to_string)
.unwrap_or("".to_string());
let path = Utf8PathBuf::from(path.tenant_id.to_string())
.join(timeline_id)
.join(endpoint_id);
let path = RemotePath::new(&path).unwrap(); // unwrap() because the path is already relative
PrefixS3Path { path }
}
}
impl FromRequestParts<Arc<Storage>> for PrefixS3Path {
type Rejection = Response;
async fn from_request_parts(
parts: &mut Parts,
state: &Arc<Storage>,
) -> Result<Self, Self::Rejection> {
let Path(path) = parts
.extract::<Path<PrefixKeyPath>>()
.await
.map_err(|e| bad_request(e, "invalid route"))?;
let TypedHeader(Authorization(bearer)) = parts
.extract::<TypedHeader<Authorization<Bearer>>>()
.await
.map_err(|e| bad_request(e, "invalid token"))?;
let claims: PrefixKeyPath = state
.auth
.decode(bearer.token())
.map_err(|e| bad_request(e, "invalid token"))?;
if path != claims {
return Err(unauthorized(path, claims));
}
Ok((&path).into())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn normalize_key() {
let f = super::normalize_key;
assert_eq!(f("hello/world/..").unwrap(), Utf8PathBuf::from("hello"));
assert_eq!(
f("ololo/1/../../not_ololo").unwrap(),
Utf8PathBuf::from("not_ololo")
);
assert!(f("ololo/1/../../../").is_err());
assert!(f(".").is_err());
assert!(f("../").is_err());
assert!(f("").is_err());
assert_eq!(f("/1/2/3").unwrap(), Utf8PathBuf::from("1/2/3"));
assert!(f("/1/2/3/../../../").is_err());
assert!(f("/1/2/3/../../../../").is_err());
}
const TENANT_ID: TenantId =
TenantId::from_array([1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]);
const TIMELINE_ID: TimelineId =
TimelineId::from_array([1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
#[test]
fn s3_path() {
let auth = Claims {
tenant_id: TENANT_ID,
timeline_id: TIMELINE_ID,
endpoint_id: ENDPOINT_ID.into(),
exp: u64::MAX,
};
let s3_path = |key| {
let path = &format!("{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/{key}");
let path = RemotePath::from_string(path).unwrap();
S3Path { path }
};
let path = "cache_key".to_string();
let mut key_path = KeyRequest {
path,
tenant_id: auth.tenant_id,
timeline_id: auth.timeline_id,
endpoint_id: auth.endpoint_id,
};
assert_eq!(S3Path::try_from(&key_path).unwrap(), s3_path(key_path.path));
key_path.path = "we/can/have/nested/paths".to_string();
assert_eq!(S3Path::try_from(&key_path).unwrap(), s3_path(key_path.path));
key_path.path = "../error/hello/../".to_string();
assert!(S3Path::try_from(&key_path).is_err());
}
#[test]
fn prefix_s3_path() {
let mut path = PrefixKeyPath {
tenant_id: TENANT_ID,
timeline_id: None,
endpoint_id: None,
};
let prefix_path = |s: String| RemotePath::from_string(&s).unwrap();
assert_eq!(
PrefixS3Path::from(&path).path,
prefix_path(format!("{TENANT_ID}"))
);
path.timeline_id = Some(TIMELINE_ID);
assert_eq!(
PrefixS3Path::from(&path).path,
prefix_path(format!("{TENANT_ID}/{TIMELINE_ID}"))
);
path.endpoint_id = Some(ENDPOINT_ID.into());
assert_eq!(
PrefixS3Path::from(&path).path,
prefix_path(format!("{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}"))
);
}
}

View File

@@ -0,0 +1,65 @@
//! `object_storage` is a service which provides API for uploading and downloading
//! files. It is used by compute and control plane for accessing LFC prewarm data.
//! This service is deployed either as a separate component or as part of compute image
//! for large computes.
mod app;
use anyhow::Context;
use tracing::info;
use utils::logging;
//see set()
const fn max_upload_file_limit() -> usize {
100 * 1024 * 1024
}
#[derive(serde::Deserialize)]
#[serde(tag = "type")]
struct Config {
listen: std::net::SocketAddr,
pemfile: camino::Utf8PathBuf,
#[serde(flatten)]
storage_config: remote_storage::RemoteStorageConfig,
#[serde(default = "max_upload_file_limit")]
max_upload_file_limit: usize,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
logging::init(
logging::LogFormat::Plain,
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
logging::Output::Stdout,
)?;
let config: String = std::env::args().skip(1).take(1).collect();
if config.is_empty() {
anyhow::bail!("Usage: object_storage config.json")
}
info!("Reading config from {config}");
let config = std::fs::read_to_string(config.clone())?;
let config: Config = serde_json::from_str(&config).context("parsing config")?;
info!("Reading pemfile from {}", config.pemfile.clone());
let pemfile = std::fs::read(config.pemfile.clone())?;
info!("Loading public key from {}", config.pemfile.clone());
let auth = object_storage::JwtAuth::new(&pemfile)?;
let listener = tokio::net::TcpListener::bind(config.listen).await.unwrap();
info!("listening on {}", listener.local_addr().unwrap());
let storage = remote_storage::GenericRemoteStorage::from_config(&config.storage_config).await?;
let cancel = tokio_util::sync::CancellationToken::new();
app::check_storage_permissions(&storage, cancel.clone()).await?;
let proxy = std::sync::Arc::new(object_storage::Storage {
auth,
storage,
cancel: cancel.clone(),
max_upload_file_limit: config.max_upload_file_limit,
});
tokio::spawn(utils::signals::signal_handler(cancel.clone()));
axum::serve(listener, app::app(proxy))
.with_graceful_shutdown(async move { cancel.cancelled().await })
.await?;
Ok(())
}

View File

@@ -31,7 +31,6 @@ use pageserver::{
};
use postgres_backend::AuthType;
use remote_storage::GenericRemoteStorage;
use tokio::signal::unix::SignalKind;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -744,32 +743,7 @@ fn start_pageserver(
let signal_token = CancellationToken::new();
let signal_cancel = signal_token.child_token();
// Spawn signal handlers. Runs in a loop since we want to be responsive to multiple signals
// even after triggering shutdown (e.g. a SIGQUIT after a slow SIGTERM shutdown). See:
// https://github.com/neondatabase/neon/issues/9740.
tokio::spawn(async move {
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap();
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap();
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit()).unwrap();
loop {
let signal = tokio::select! {
_ = sigquit.recv() => {
info!("Got signal SIGQUIT. Terminating in immediate shutdown mode.");
std::process::exit(111);
}
_ = sigint.recv() => "SIGINT",
_ = sigterm.recv() => "SIGTERM",
};
if !signal_token.is_cancelled() {
info!("Got signal {signal}. Terminating gracefully in fast shutdown mode.");
signal_token.cancel();
} else {
info!("Got signal {signal}. Already shutting down.");
}
}
});
tokio::spawn(utils::signals::signal_handler(signal_token));
// Wait for cancellation signal and shut down the pageserver.
//

View File

@@ -67,7 +67,7 @@ use crate::tenant::mgr::{
};
use crate::tenant::remote_timeline_client::index::GcCompactionState;
use crate::tenant::remote_timeline_client::{
download_index_part, list_remote_tenant_shards, list_remote_timelines,
download_index_part, download_tenant_manifest, list_remote_tenant_shards, list_remote_timelines,
};
use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
@@ -2911,9 +2911,22 @@ async fn tenant_scan_remote_handler(
};
}
let result =
download_tenant_manifest(&state.remote_storage, &tenant_shard_id, generation, &cancel)
.instrument(info_span!("download_tenant_manifest",
tenant_id=%tenant_shard_id.tenant_id,
shard_id=%tenant_shard_id.shard_slug()))
.await;
let stripe_size = match result {
Ok((manifest, _, _)) => manifest.stripe_size,
Err(DownloadError::NotFound) => None,
Err(err) => return Err(ApiError::InternalServerError(anyhow!(err))),
};
response.shards.push(TenantScanRemoteStorageShard {
tenant_shard_id,
generation: generation.into(),
stripe_size,
});
}

View File

@@ -45,6 +45,7 @@ use remote_timeline_client::manifest::{
};
use remote_timeline_client::{
FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD, UploadQueueNotReadyError,
download_tenant_manifest,
};
use secondary::heatmap::{HeatMapTenant, HeatMapTimeline};
use storage_broker::BrokerClientChannel;
@@ -226,7 +227,8 @@ struct TimelinePreload {
}
pub(crate) struct TenantPreload {
tenant_manifest: TenantManifest,
/// The tenant manifest from remote storage, or None if no manifest was found.
tenant_manifest: Option<TenantManifest>,
/// Map from timeline ID to a possible timeline preload. It is None iff the timeline is offloaded according to the manifest.
timelines: HashMap<TimelineId, Option<TimelinePreload>>,
}
@@ -282,12 +284,15 @@ pub struct Tenant {
/// **Lock order**: if acquiring all (or a subset), acquire them in order `timelines`, `timelines_offloaded`, `timelines_creating`
timelines_offloaded: Mutex<HashMap<TimelineId, Arc<OffloadedTimeline>>>,
/// Serialize writes of the tenant manifest to remote storage. If there are concurrent operations
/// affecting the manifest, such as timeline deletion and timeline offload, they must wait for
/// each other (this could be optimized to coalesce writes if necessary).
/// The last tenant manifest known to be in remote storage. None if the manifest has not yet
/// been either downloaded or uploaded. Always Some after tenant attach.
///
/// The contents of the Mutex are the last manifest we successfully uploaded
tenant_manifest_upload: tokio::sync::Mutex<Option<TenantManifest>>,
/// Initially populated during tenant attach, updated via `maybe_upload_tenant_manifest`.
///
/// Do not modify this directly. It is used to check whether a new manifest needs to be
/// uploaded. The manifest is constructed in `build_tenant_manifest`, and uploaded via
/// `maybe_upload_tenant_manifest`.
remote_tenant_manifest: tokio::sync::Mutex<Option<TenantManifest>>,
// This mutex prevents creation of new timelines during GC.
// Adding yet another mutex (in addition to `timelines`) is needed because holding
@@ -1354,36 +1359,41 @@ impl Tenant {
}
}
// Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
enum BrokenVerbosity {
Error,
Info
}
let make_broken =
|t: &Tenant, err: anyhow::Error, verbosity: BrokenVerbosity| {
match verbosity {
BrokenVerbosity::Info => {
info!("attach cancelled, setting tenant state to Broken: {err}");
},
BrokenVerbosity::Error => {
error!("attach failed, setting tenant state to Broken: {err:?}");
}
fn make_broken_or_stopping(t: &Tenant, err: anyhow::Error) {
t.state.send_modify(|state| match state {
// TODO: the old code alluded to DeleteTenantFlow sometimes setting
// TenantState::Stopping before we get here, but this may be outdated.
// Let's find out with a testing assertion. If this doesn't fire, and the
// logs don't show this happening in production, remove the Stopping cases.
TenantState::Stopping{..} if cfg!(any(test, feature = "testing")) => {
panic!("unexpected TenantState::Stopping during attach")
}
t.state.send_modify(|state| {
// The Stopping case is for when we have passed control on to DeleteTenantFlow:
// if it errors, we will call make_broken when tenant is already in Stopping.
assert!(
matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
"the attach task owns the tenant state until activation is complete"
);
*state = TenantState::broken_from_reason(err.to_string());
});
};
// If the tenant is cancelled, assume the error was caused by cancellation.
TenantState::Attaching if t.cancel.is_cancelled() => {
info!("attach cancelled, setting tenant state to Stopping: {err}");
// NB: progress None tells `set_stopping` that attach has cancelled.
*state = TenantState::Stopping { progress: None };
}
// According to the old code, DeleteTenantFlow may already have set this to
// Stopping. Retain its progress.
// TODO: there is no DeleteTenantFlow. Is this still needed? See above.
TenantState::Stopping { progress } if t.cancel.is_cancelled() => {
assert!(progress.is_some(), "concurrent attach cancellation");
info!("attach cancelled, already Stopping: {err}");
}
// Mark the tenant as broken.
TenantState::Attaching | TenantState::Stopping { .. } => {
error!("attach failed, setting tenant state to Broken (was {state}): {err:?}");
*state = TenantState::broken_from_reason(err.to_string())
}
// The attach task owns the tenant state until activated.
state => panic!("invalid tenant state {state} during attach: {err:?}"),
});
}
// TODO: should also be rejecting tenant conf changes that violate this check.
if let Err(e) = crate::tenant::storage_layer::inmemory_layer::IndexEntry::validate_checkpoint_distance(tenant_clone.get_checkpoint_distance()) {
make_broken(&tenant_clone, anyhow::anyhow!(e), BrokenVerbosity::Error);
make_broken_or_stopping(&tenant_clone, anyhow::anyhow!(e));
return Ok(());
}
@@ -1435,10 +1445,8 @@ impl Tenant {
// stayed in Activating for such a long time that shutdown found it in
// that state.
tracing::info!(state=%tenant_clone.current_state(), "Tenant shut down before activation");
// Make the tenant broken so that set_stopping will not hang waiting for it to leave
// the Attaching state. This is an over-reaction (nothing really broke, the tenant is
// just shutting down), but ensures progress.
make_broken(&tenant_clone, anyhow::anyhow!("Shut down while Attaching"), BrokenVerbosity::Info);
// Set the tenant to Stopping to signal `set_stopping` that we're done.
make_broken_or_stopping(&tenant_clone, anyhow::anyhow!("Shut down while Attaching"));
return Ok(());
},
)
@@ -1457,7 +1465,7 @@ impl Tenant {
match res {
Ok(p) => Some(p),
Err(e) => {
make_broken(&tenant_clone, anyhow::anyhow!(e), BrokenVerbosity::Error);
make_broken_or_stopping(&tenant_clone, anyhow::anyhow!(e));
return Ok(());
}
}
@@ -1483,9 +1491,7 @@ impl Tenant {
info!("attach finished, activating");
tenant_clone.activate(broker_client, None, &ctx);
}
Err(e) => {
make_broken(&tenant_clone, anyhow::anyhow!(e), BrokenVerbosity::Error);
}
Err(e) => make_broken_or_stopping(&tenant_clone, anyhow::anyhow!(e)),
}
// If we are doing an opportunistic warmup attachment at startup, initialize
@@ -1525,28 +1531,27 @@ impl Tenant {
cancel.clone(),
)
.await?;
let (offloaded_add, tenant_manifest) =
match remote_timeline_client::download_tenant_manifest(
remote_storage,
&self.tenant_shard_id,
self.generation,
&cancel,
)
.await
{
Ok((tenant_manifest, _generation, _manifest_mtime)) => (
format!("{} offloaded", tenant_manifest.offloaded_timelines.len()),
tenant_manifest,
),
Err(DownloadError::NotFound) => {
("no manifest".to_string(), TenantManifest::empty())
}
Err(e) => Err(e)?,
};
let tenant_manifest = match download_tenant_manifest(
remote_storage,
&self.tenant_shard_id,
self.generation,
&cancel,
)
.await
{
Ok((tenant_manifest, _, _)) => Some(tenant_manifest),
Err(DownloadError::NotFound) => None,
Err(err) => return Err(err.into()),
};
info!(
"found {} timelines, and {offloaded_add}",
remote_timeline_ids.len()
"found {} timelines ({} offloaded timelines)",
remote_timeline_ids.len(),
tenant_manifest
.as_ref()
.map(|m| m.offloaded_timelines.len())
.unwrap_or(0)
);
for k in other_keys {
@@ -1555,11 +1560,13 @@ impl Tenant {
// Avoid downloading IndexPart of offloaded timelines.
let mut offloaded_with_prefix = HashSet::new();
for offloaded in tenant_manifest.offloaded_timelines.iter() {
if remote_timeline_ids.remove(&offloaded.timeline_id) {
offloaded_with_prefix.insert(offloaded.timeline_id);
} else {
// We'll take care later of timelines in the manifest without a prefix
if let Some(tenant_manifest) = &tenant_manifest {
for offloaded in tenant_manifest.offloaded_timelines.iter() {
if remote_timeline_ids.remove(&offloaded.timeline_id) {
offloaded_with_prefix.insert(offloaded.timeline_id);
} else {
// We'll take care later of timelines in the manifest without a prefix
}
}
}
@@ -1633,12 +1640,14 @@ impl Tenant {
let mut offloaded_timeline_ids = HashSet::new();
let mut offloaded_timelines_list = Vec::new();
for timeline_manifest in preload.tenant_manifest.offloaded_timelines.iter() {
let timeline_id = timeline_manifest.timeline_id;
let offloaded_timeline =
OffloadedTimeline::from_manifest(self.tenant_shard_id, timeline_manifest);
offloaded_timelines_list.push((timeline_id, Arc::new(offloaded_timeline)));
offloaded_timeline_ids.insert(timeline_id);
if let Some(tenant_manifest) = &preload.tenant_manifest {
for timeline_manifest in tenant_manifest.offloaded_timelines.iter() {
let timeline_id = timeline_manifest.timeline_id;
let offloaded_timeline =
OffloadedTimeline::from_manifest(self.tenant_shard_id, timeline_manifest);
offloaded_timelines_list.push((timeline_id, Arc::new(offloaded_timeline)));
offloaded_timeline_ids.insert(timeline_id);
}
}
// Complete deletions for offloaded timeline id's from manifest.
// The manifest will be uploaded later in this function.
@@ -1796,15 +1805,21 @@ impl Tenant {
.context("resume_deletion")
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
}
let needs_manifest_upload =
offloaded_timelines_list.len() != preload.tenant_manifest.offloaded_timelines.len();
{
let mut offloaded_timelines_accessor = self.timelines_offloaded.lock().unwrap();
offloaded_timelines_accessor.extend(offloaded_timelines_list.into_iter());
}
if needs_manifest_upload {
self.store_tenant_manifest().await?;
// Stash the preloaded tenant manifest, and upload a new manifest if changed.
//
// NB: this must happen after the tenant is fully populated above. In particular the
// offloaded timelines, which are included in the manifest.
{
let mut guard = self.remote_tenant_manifest.lock().await;
assert!(guard.is_none(), "tenant manifest set before preload"); // first populated here
*guard = preload.tenant_manifest;
}
self.maybe_upload_tenant_manifest().await?;
// The local filesystem contents are a cache of what's in the remote IndexPart;
// IndexPart is the source of truth.
@@ -2218,7 +2233,7 @@ impl Tenant {
};
// Upload new list of offloaded timelines to S3
self.store_tenant_manifest().await?;
self.maybe_upload_tenant_manifest().await?;
// Activate the timeline (if it makes sense)
if !(timeline.is_broken() || timeline.is_stopping()) {
@@ -3429,7 +3444,7 @@ impl Tenant {
shutdown_mode
};
match self.set_stopping(shutdown_progress, false, false).await {
match self.set_stopping(shutdown_progress).await {
Ok(()) => {}
Err(SetStoppingError::Broken) => {
// assume that this is acceptable
@@ -3509,25 +3524,13 @@ impl Tenant {
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
///
/// This function is not cancel-safe!
///
/// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
/// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
async fn set_stopping(
&self,
progress: completion::Barrier,
_allow_transition_from_loading: bool,
allow_transition_from_attaching: bool,
) -> Result<(), SetStoppingError> {
async fn set_stopping(&self, progress: completion::Barrier) -> Result<(), SetStoppingError> {
let mut rx = self.state.subscribe();
// cannot stop before we're done activating, so wait out until we're done activating
rx.wait_for(|state| match state {
TenantState::Attaching if allow_transition_from_attaching => true,
TenantState::Activating(_) | TenantState::Attaching => {
info!(
"waiting for {} to turn Active|Broken|Stopping",
<&'static str>::from(state)
);
info!("waiting for {state} to turn Active|Broken|Stopping");
false
}
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
@@ -3538,25 +3541,24 @@ impl Tenant {
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
let mut err = None;
let stopping = self.state.send_if_modified(|current_state| match current_state {
TenantState::Activating(_) => {
unreachable!("1we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Attaching => {
if !allow_transition_from_attaching {
unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
};
*current_state = TenantState::Stopping { progress };
true
TenantState::Activating(_) | TenantState::Attaching => {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Active => {
// FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
// are created after the transition to Stopping. That's harmless, as the Timelines
// won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
*current_state = TenantState::Stopping { progress };
*current_state = TenantState::Stopping { progress: Some(progress) };
// Continue stopping outside the closure. We need to grab timelines.lock()
// and we plan to turn it into a tokio::sync::Mutex in a future patch.
true
}
TenantState::Stopping { progress: None } => {
// An attach was cancelled, and the attach transitioned the tenant from Attaching to
// Stopping(None) to let us know it exited. Register our progress and continue.
*current_state = TenantState::Stopping { progress: Some(progress) };
true
}
TenantState::Broken { reason, .. } => {
info!(
"Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
@@ -3564,7 +3566,7 @@ impl Tenant {
err = Some(SetStoppingError::Broken);
false
}
TenantState::Stopping { progress } => {
TenantState::Stopping { progress: Some(progress) } => {
info!("Tenant is already in Stopping state");
err = Some(SetStoppingError::AlreadyStopping(progress.clone()));
false
@@ -4065,18 +4067,20 @@ impl Tenant {
/// Generate an up-to-date TenantManifest based on the state of this Tenant.
fn build_tenant_manifest(&self) -> TenantManifest {
let timelines_offloaded = self.timelines_offloaded.lock().unwrap();
let mut timeline_manifests = timelines_offloaded
.iter()
.map(|(_timeline_id, offloaded)| offloaded.manifest())
.collect::<Vec<_>>();
// Sort the manifests so that our output is deterministic
timeline_manifests.sort_by_key(|timeline_manifest| timeline_manifest.timeline_id);
// Collect the offloaded timelines, and sort them for deterministic output.
let offloaded_timelines = self
.timelines_offloaded
.lock()
.unwrap()
.values()
.map(|tli| tli.manifest())
.sorted_by_key(|m| m.timeline_id)
.collect_vec();
TenantManifest {
version: LATEST_TENANT_MANIFEST_VERSION,
offloaded_timelines: timeline_manifests,
stripe_size: Some(self.get_shard_stripe_size()),
offloaded_timelines,
}
}
@@ -4299,7 +4303,7 @@ impl Tenant {
timelines: Mutex::new(HashMap::new()),
timelines_creating: Mutex::new(HashSet::new()),
timelines_offloaded: Mutex::new(HashMap::new()),
tenant_manifest_upload: Default::default(),
remote_tenant_manifest: Default::default(),
gc_cs: tokio::sync::Mutex::new(()),
walredo_mgr,
remote_storage,
@@ -5532,27 +5536,35 @@ impl Tenant {
.unwrap_or(0)
}
/// Serialize and write the latest TenantManifest to remote storage.
pub(crate) async fn store_tenant_manifest(&self) -> Result<(), TenantManifestError> {
// Only one manifest write may be done at at time, and the contents of the manifest
// must be loaded while holding this lock. This makes it safe to call this function
// from anywhere without worrying about colliding updates.
/// Builds a new tenant manifest, and uploads it if it differs from the last-known tenant
/// manifest in `Self::remote_tenant_manifest`.
///
/// TODO: instead of requiring callers to remember to call `maybe_upload_tenant_manifest` after
/// changing any `Tenant` state that's included in the manifest, consider making the manifest
/// the authoritative source of data with an API that automatically uploads on changes. Revisit
/// this when the manifest is more widely used and we have a better idea of the data model.
pub(crate) async fn maybe_upload_tenant_manifest(&self) -> Result<(), TenantManifestError> {
// Multiple tasks may call this function concurrently after mutating the Tenant runtime
// state, affecting the manifest generated by `build_tenant_manifest`. We use an async mutex
// to serialize these callers. `eq_ignoring_version` acts as a slightly inefficient but
// simple coalescing mechanism.
let mut guard = tokio::select! {
g = self.tenant_manifest_upload.lock() => {
g
},
_ = self.cancel.cancelled() => {
return Err(TenantManifestError::Cancelled);
}
guard = self.remote_tenant_manifest.lock() => guard,
_ = self.cancel.cancelled() => return Err(TenantManifestError::Cancelled),
};
// Build a new manifest.
let manifest = self.build_tenant_manifest();
if Some(&manifest) == (*guard).as_ref() {
// Optimisation: skip uploads that don't change anything.
return Ok(());
// Check if the manifest has changed. We ignore the version number here, to avoid
// uploading every manifest on version number bumps.
if let Some(old) = guard.as_ref() {
if manifest.eq_ignoring_version(old) {
return Ok(());
}
}
// Remote storage does no retries internally, so wrap it
// Upload the manifest. Remote storage does no retries internally, so retry here.
match backoff::retry(
|| async {
upload_tenant_manifest(
@@ -5564,7 +5576,7 @@ impl Tenant {
)
.await
},
|_e| self.cancel.is_cancelled(),
|_| self.cancel.is_cancelled(),
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"uploading tenant manifest",
@@ -8722,6 +8734,21 @@ mod tests {
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_init("i")),
),
(
get_key(4),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append_conditional("j", "i")),
),
(
get_key(5),
Lsn(0x20),
Value::WalRecord(NeonWalRecord::wal_init("1")),
),
(
get_key(5),
Lsn(0x30),
Value::WalRecord(NeonWalRecord::wal_append_conditional("j", "2")),
),
];
let image1 = vec![(get_key(1), "0x10".into())];
@@ -8752,8 +8779,18 @@ mod tests {
// Need to remove the limit of "Neon WAL redo requires base image".
// assert_eq!(tline.get(get_key(3), Lsn(0x50), &ctx).await?, Bytes::new());
// assert_eq!(tline.get(get_key(4), Lsn(0x50), &ctx).await?, Bytes::new());
assert_eq!(
tline.get(get_key(3), Lsn(0x50), &ctx).await?,
Bytes::from_static(b"c")
);
assert_eq!(
tline.get(get_key(4), Lsn(0x50), &ctx).await?,
Bytes::from_static(b"ij")
);
// Manual testing required: currently, read errors will panic the process in debug mode. So we
// cannot enable this assertion in the unit test.
// assert!(tline.get(get_key(5), Lsn(0x50), &ctx).await.is_err());
Ok(())
}

View File

@@ -1,21 +1,33 @@
use chrono::NaiveDateTime;
use pageserver_api::shard::ShardStripeSize;
use serde::{Deserialize, Serialize};
use utils::id::TimelineId;
use utils::lsn::Lsn;
/// Tenant-shard scoped manifest
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
/// Tenant shard manifest, stored in remote storage. Contains offloaded timelines and other tenant
/// shard-wide information that must be persisted in remote storage.
///
/// The manifest is always updated on tenant attach, and as needed.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct TenantManifest {
/// Debugging aid describing the version of this manifest.
/// Can also be used for distinguishing breaking changes later on.
/// The manifest version. Incremented on manifest format changes, even non-breaking ones.
/// Manifests must generally always be backwards and forwards compatible for one release, to
/// allow release rollbacks.
pub version: usize,
/// This tenant's stripe size. This is only advisory, and used to recover tenant data from
/// remote storage. The autoritative source is the storage controller. If None, assume the
/// original default value of 32768 blocks (256 MB).
#[serde(skip_serializing_if = "Option::is_none")]
pub stripe_size: Option<ShardStripeSize>,
/// The list of offloaded timelines together with enough information
/// to not have to actually load them.
///
/// Note: the timelines mentioned in this list might be deleted, i.e.
/// we don't hold an invariant that the references aren't dangling.
/// Existence of index-part.json is the actual indicator of timeline existence.
#[serde(default)]
pub offloaded_timelines: Vec<OffloadedTimelineManifest>,
}
@@ -24,7 +36,7 @@ pub struct TenantManifest {
/// Very similar to [`pageserver_api::models::OffloadedTimelineInfo`],
/// but the two datastructures serve different needs, this is for a persistent disk format
/// that must be backwards compatible, while the other is only for informative purposes.
#[derive(Clone, Serialize, Deserialize, Copy, PartialEq, Eq)]
#[derive(Clone, Debug, Serialize, Deserialize, Copy, PartialEq, Eq)]
pub struct OffloadedTimelineManifest {
pub timeline_id: TimelineId,
/// Whether the timeline has a parent it has been branched off from or not
@@ -35,20 +47,166 @@ pub struct OffloadedTimelineManifest {
pub archived_at: NaiveDateTime,
}
pub const LATEST_TENANT_MANIFEST_VERSION: usize = 1;
/// The newest manifest version. This should be incremented on changes, even non-breaking ones. We
/// do not use deny_unknown_fields, so new fields are not breaking.
///
/// 1: initial version
/// 2: +stripe_size
///
/// When adding new versions, also add a parse_vX test case below.
pub const LATEST_TENANT_MANIFEST_VERSION: usize = 2;
impl TenantManifest {
pub(crate) fn empty() -> Self {
Self {
version: LATEST_TENANT_MANIFEST_VERSION,
offloaded_timelines: vec![],
/// Returns true if the manifests are equal, ignoring the version number. This avoids
/// re-uploading all manifests just because the version number is bumped.
pub fn eq_ignoring_version(&self, other: &Self) -> bool {
// Fast path: if the version is equal, just compare directly.
if self.version == other.version {
return self == other;
}
}
pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice::<Self>(bytes)
// We could alternatively just clone and modify the version here.
let Self {
version: _, // ignore version
stripe_size,
offloaded_timelines,
} = self;
stripe_size == &other.stripe_size && offloaded_timelines == &other.offloaded_timelines
}
pub(crate) fn to_json_bytes(&self) -> serde_json::Result<Vec<u8>> {
/// Decodes a manifest from JSON.
pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice(bytes)
}
/// Encodes a manifest as JSON.
pub fn to_json_bytes(&self) -> serde_json::Result<Vec<u8>> {
serde_json::to_vec(self)
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use utils::id::TimelineId;
use super::*;
/// Empty manifests should be parsed. Version is required.
#[test]
fn parse_empty() -> anyhow::Result<()> {
let json = r#"{
"version": 0
}"#;
let expected = TenantManifest {
version: 0,
stripe_size: None,
offloaded_timelines: Vec::new(),
};
assert_eq!(expected, TenantManifest::from_json_bytes(json.as_bytes())?);
Ok(())
}
/// Unknown fields should be ignored, for forwards compatibility.
#[test]
fn parse_unknown_fields() -> anyhow::Result<()> {
let json = r#"{
"version": 1,
"foo": "bar"
}"#;
let expected = TenantManifest {
version: 1,
stripe_size: None,
offloaded_timelines: Vec::new(),
};
assert_eq!(expected, TenantManifest::from_json_bytes(json.as_bytes())?);
Ok(())
}
/// v1 manifests should be parsed, for backwards compatibility.
#[test]
fn parse_v1() -> anyhow::Result<()> {
let json = r#"{
"version": 1,
"offloaded_timelines": [
{
"timeline_id": "5c4df612fd159e63c1b7853fe94d97da",
"archived_at": "2025-03-07T11:07:11.373105434"
},
{
"timeline_id": "f3def5823ad7080d2ea538d8e12163fa",
"ancestor_timeline_id": "5c4df612fd159e63c1b7853fe94d97da",
"ancestor_retain_lsn": "0/1F79038",
"archived_at": "2025-03-05T11:10:22.257901390"
}
]
}"#;
let expected = TenantManifest {
version: 1,
stripe_size: None,
offloaded_timelines: vec![
OffloadedTimelineManifest {
timeline_id: TimelineId::from_str("5c4df612fd159e63c1b7853fe94d97da")?,
ancestor_timeline_id: None,
ancestor_retain_lsn: None,
archived_at: NaiveDateTime::from_str("2025-03-07T11:07:11.373105434")?,
},
OffloadedTimelineManifest {
timeline_id: TimelineId::from_str("f3def5823ad7080d2ea538d8e12163fa")?,
ancestor_timeline_id: Some(TimelineId::from_str(
"5c4df612fd159e63c1b7853fe94d97da",
)?),
ancestor_retain_lsn: Some(Lsn::from_str("0/1F79038")?),
archived_at: NaiveDateTime::from_str("2025-03-05T11:10:22.257901390")?,
},
],
};
assert_eq!(expected, TenantManifest::from_json_bytes(json.as_bytes())?);
Ok(())
}
/// v2 manifests should be parsed, for backwards compatibility.
#[test]
fn parse_v2() -> anyhow::Result<()> {
let json = r#"{
"version": 2,
"stripe_size": 32768,
"offloaded_timelines": [
{
"timeline_id": "5c4df612fd159e63c1b7853fe94d97da",
"archived_at": "2025-03-07T11:07:11.373105434"
},
{
"timeline_id": "f3def5823ad7080d2ea538d8e12163fa",
"ancestor_timeline_id": "5c4df612fd159e63c1b7853fe94d97da",
"ancestor_retain_lsn": "0/1F79038",
"archived_at": "2025-03-05T11:10:22.257901390"
}
]
}"#;
let expected = TenantManifest {
version: 2,
stripe_size: Some(ShardStripeSize(32768)),
offloaded_timelines: vec![
OffloadedTimelineManifest {
timeline_id: TimelineId::from_str("5c4df612fd159e63c1b7853fe94d97da")?,
ancestor_timeline_id: None,
ancestor_retain_lsn: None,
archived_at: NaiveDateTime::from_str("2025-03-07T11:07:11.373105434")?,
},
OffloadedTimelineManifest {
timeline_id: TimelineId::from_str("f3def5823ad7080d2ea538d8e12163fa")?,
ancestor_timeline_id: Some(TimelineId::from_str(
"5c4df612fd159e63c1b7853fe94d97da",
)?),
ancestor_retain_lsn: Some(Lsn::from_str("0/1F79038")?),
archived_at: NaiveDateTime::from_str("2025-03-05T11:10:22.257901390")?,
},
],
};
assert_eq!(expected, TenantManifest::from_json_bytes(json.as_bytes())?);
Ok(())
}
}

View File

@@ -61,6 +61,7 @@ pub(crate) async fn upload_index_part(
.await
.with_context(|| format!("upload index part for '{tenant_shard_id} / {timeline_id}'"))
}
/// Serializes and uploads the given tenant manifest data to the remote storage.
pub(crate) async fn upload_tenant_manifest(
storage: &GenericRemoteStorage,
@@ -76,16 +77,14 @@ pub(crate) async fn upload_tenant_manifest(
});
pausable_failpoint!("before-upload-manifest-pausable");
let serialized = tenant_manifest.to_json_bytes()?;
let serialized = Bytes::from(serialized);
let tenant_manifest_site = serialized.len();
let serialized = Bytes::from(tenant_manifest.to_json_bytes()?);
let tenant_manifest_size = serialized.len();
let remote_path = remote_tenant_manifest_path(tenant_shard_id, generation);
storage
.upload_storage_object(
futures::stream::once(futures::future::ready(Ok(serialized))),
tenant_manifest_site,
tenant_manifest_size,
&remote_path,
cancel,
)

View File

@@ -268,7 +268,12 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
error_run += 1;
let backoff =
exponential_backoff_duration(error_run, BASE_BACKOFF_SECS, MAX_BACKOFF_SECS);
log_compaction_error(&err, Some((error_run, backoff)), cancel.is_cancelled());
log_compaction_error(
&err,
Some((error_run, backoff)),
cancel.is_cancelled(),
false,
);
continue;
}
}
@@ -285,6 +290,7 @@ pub(crate) fn log_compaction_error(
err: &CompactionError,
retry_info: Option<(u32, Duration)>,
task_cancelled: bool,
degrade_to_warning: bool,
) {
use CompactionError::*;
@@ -333,6 +339,7 @@ pub(crate) fn log_compaction_error(
}
} else {
match level {
Level::ERROR if degrade_to_warning => warn!("Compaction failed and discarded: {err:#}"),
Level::ERROR => error!("Compaction failed: {err:#}"),
Level::INFO => info!("Compaction failed: {err:#}"),
level => unimplemented!("unexpected level {level:?}"),

View File

@@ -1940,7 +1940,7 @@ impl Timeline {
)
.await;
if let Err(err) = &res {
log_compaction_error(err, None, cancel.is_cancelled());
log_compaction_error(err, None, cancel.is_cancelled(), false);
}
res
}
@@ -6361,10 +6361,33 @@ impl Timeline {
/// Reconstruct a value, using the given base image and WAL records in 'data'.
async fn reconstruct_value(
&self,
key: Key,
request_lsn: Lsn,
data: ValueReconstructState,
) -> Result<Bytes, PageReconstructError> {
self.reconstruct_value_inner(key, request_lsn, data, false)
.await
}
/// Reconstruct a value, using the given base image and WAL records in 'data'. It does not fire critical errors because
/// sometimes it is expected to fail due to unreplayable history described in <https://github.com/neondatabase/neon/issues/10395>.
async fn reconstruct_value_wo_critical_error(
&self,
key: Key,
request_lsn: Lsn,
data: ValueReconstructState,
) -> Result<Bytes, PageReconstructError> {
self.reconstruct_value_inner(key, request_lsn, data, true)
.await
}
async fn reconstruct_value_inner(
&self,
key: Key,
request_lsn: Lsn,
mut data: ValueReconstructState,
no_critical_error: bool,
) -> Result<Bytes, PageReconstructError> {
// Perform WAL redo if needed
data.records.reverse();
@@ -6421,7 +6444,9 @@ impl Timeline {
Ok(img) => img,
Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled),
Err(walredo::Error::Other(err)) => {
critical!("walredo failure during page reconstruction: {err:?}");
if !no_critical_error {
critical!("walredo failure during page reconstruction: {err:?}");
}
return Err(PageReconstructError::WalRedo(
err.context("reconstruct a page image"),
));

View File

@@ -448,7 +448,7 @@ impl GcCompactionQueue {
) -> Result<CompactionOutcome, CompactionError> {
let res = self.iteration_inner(cancel, ctx, gc_block, timeline).await;
if let Err(err) = &res {
log_compaction_error(err, None, cancel.is_cancelled());
log_compaction_error(err, None, cancel.is_cancelled(), true);
}
match res {
Ok(res) => Ok(res),
@@ -2415,7 +2415,9 @@ impl Timeline {
} else {
lsn_split_points[i]
};
let img = self.reconstruct_value(key, request_lsn, state).await?;
let img = self
.reconstruct_value_wo_critical_error(key, request_lsn, state)
.await?;
Some((request_lsn, img))
} else {
None
@@ -3114,8 +3116,6 @@ impl Timeline {
// the key and LSN range are determined. However, to keep things simple here, we still
// create this writer, and discard the writer in the end.
let mut keys_processed = 0;
while let Some(((key, lsn, val), desc)) = merge_iter
.next_with_trace()
.await
@@ -3126,9 +3126,7 @@ impl Timeline {
return Err(CompactionError::ShuttingDown);
}
keys_processed += 1;
let should_yield = yield_for_l0
&& keys_processed % 1000 == 0
&& self
.l0_compaction_trigger
.notified()

View File

@@ -410,10 +410,13 @@ impl DeleteTimelineFlow {
// So indeed, the tenant manifest might refer to an offloaded timeline which has already been deleted.
// However, we handle this case in tenant loading code so the next time we attach, the issue is
// resolved.
tenant.store_tenant_manifest().await.map_err(|e| match e {
TenantManifestError::Cancelled => DeleteTimelineError::Cancelled,
_ => DeleteTimelineError::Other(e.into()),
})?;
tenant
.maybe_upload_tenant_manifest()
.await
.map_err(|err| match err {
TenantManifestError::Cancelled => DeleteTimelineError::Cancelled,
err => DeleteTimelineError::Other(err.into()),
})?;
*guard = Self::Finished;

View File

@@ -111,7 +111,7 @@ pub(crate) async fn offload_timeline(
// at the next restart attach it again.
// For that to happen, we'd need to make the manifest reflect our *intended* state,
// not our actual state of offloaded timelines.
tenant.store_tenant_manifest().await?;
tenant.maybe_upload_tenant_manifest().await?;
tracing::info!("Timeline offload complete (remaining arc refcount: {remaining_refcount})");

View File

@@ -276,6 +276,7 @@ pub(crate) fn apply_in_neon(
append,
clear,
will_init,
only_if,
} => {
use bytes::BufMut;
if *will_init {
@@ -288,6 +289,13 @@ pub(crate) fn apply_in_neon(
if *clear {
page.clear();
}
if let Some(only_if) = only_if {
if page != only_if.as_bytes() {
return Err(anyhow::anyhow!(
"the current image does not match the expected image, cannot append"
));
}
}
page.put_slice(append.as_bytes());
}
}

View File

@@ -21,7 +21,6 @@
#include "access/xlog.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pagestore_client.h"
#include "common/hashfn.h"
#include "pgstat.h"
#include "port/pg_iovec.h"
@@ -43,6 +42,7 @@
#include "hll.h"
#include "bitmap.h"
#include "file_cache.h"
#include "neon.h"
#include "neon_lwlsncache.h"
#include "neon_perf_counters.h"
@@ -1563,8 +1563,12 @@ local_cache_pages(PG_FUNCTION_ARGS)
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
n_pages += GET_STATE(entry, i) == AVAILABLE;
/* Skip hole tags */
if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
n_pages += GET_STATE(entry, i) == AVAILABLE;
}
}
}
}
@@ -1592,16 +1596,19 @@ local_cache_pages(PG_FUNCTION_ARGS)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
if (GET_STATE(entry, i) == AVAILABLE)
if (NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)) != 0)
{
fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i;
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].forknum = entry->key.forkNum;
fctx->record[n].blocknum = entry->key.blockNum + i;
fctx->record[n].accesscount = entry->access_count;
n += 1;
if (GET_STATE(entry, i) == AVAILABLE)
{
fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i;
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n].forknum = entry->key.forkNum;
fctx->record[n].blocknum = entry->key.blockNum + i;
fctx->record[n].accesscount = entry->access_count;
n += 1;
}
}
}
}

52
pgxn/neon/file_cache.h Normal file
View File

@@ -0,0 +1,52 @@
/*-------------------------------------------------------------------------
*
* file_cache.h
* Local File Cache definitions
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#ifndef FILE_CACHE_h
#define FILE_CACHE_h
#include "neon_pgversioncompat.h"
/* GUCs */
extern bool lfc_store_prefetch_result;
/* functions for local file cache */
extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, const void *const *buffers,
BlockNumber nblocks);
/* returns number of blocks read, with one bit set in *read for each */
extern int lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, void **buffers,
BlockNumber nblocks, bits8 *mask);
extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno);
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
extern void lfc_init(void);
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
const void* buffer, XLogRecPtr lsn);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)
{
bits8 rv = 0;
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
}
static inline void
lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
const void *buffer)
{
return lfc_writev(rinfo, forkNum, blkno, &buffer, 1);
}
#endif /* FILE_CACHE_H */

View File

@@ -1475,6 +1475,4 @@ pg_init_libpagestore(void)
}
memset(page_servers, 0, sizeof(page_servers));
lfc_init();
}

View File

@@ -29,6 +29,7 @@
#include "utils/guc_tables.h"
#include "extension_server.h"
#include "file_cache.h"
#include "neon.h"
#include "neon_lwlsncache.h"
#include "control_plane_connector.h"
@@ -434,6 +435,7 @@ _PG_init(void)
#endif
pg_init_libpagestore();
lfc_init();
pg_init_walproposer();
init_lwlsncache();

View File

@@ -47,6 +47,16 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL;
#define WAIT_EVENT_NEON_WAL_DL WAIT_EVENT_WAL_READ
#endif
#define NEON_TAG "[NEON_SMGR] "
#define neon_log(tag, fmt, ...) ereport(tag, \
(errmsg(NEON_TAG fmt, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
#define neon_shard_log(shard_no, tag, fmt, ...) ereport(tag, \
(errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);
extern void pagestore_smgr_init(void);

View File

@@ -58,14 +58,6 @@ typedef struct
#define messageTag(m) (((const NeonMessage *)(m))->tag)
#define NEON_TAG "[NEON_SMGR] "
#define neon_log(tag, fmt, ...) ereport(tag, \
(errmsg(NEON_TAG fmt, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
#define neon_shard_log(shard_no, tag, fmt, ...) ereport(tag, \
(errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
/* SLRUs downloadable from page server */
typedef enum {
SLRU_CLOG,
@@ -234,7 +226,6 @@ extern char *neon_timeline;
extern char *neon_tenant;
extern int32 max_cluster_size;
extern int neon_protocol_version;
extern bool lfc_store_prefetch_result;
extern shardno_t get_shard_number(BufferTag* tag);
@@ -285,37 +276,4 @@ extern void set_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumb
extern void update_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber size);
extern void forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum);
/* functions for local file cache */
extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, const void *const *buffers,
BlockNumber nblocks);
/* returns number of blocks read, with one bit set in *read for each */
extern int lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, void **buffers,
BlockNumber nblocks, bits8 *mask);
extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno);
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
extern void lfc_init(void);
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
const void* buffer, XLogRecPtr lsn);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)
{
bits8 rv = 0;
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
}
static inline void
lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
const void *buffer)
{
return lfc_writev(rinfo, forkNum, blkno, &buffer, 1);
}
#endif /* PAGESTORE_CLIENT_H */

View File

@@ -65,6 +65,7 @@
#include "utils/timeout.h"
#include "bitmap.h"
#include "file_cache.h"
#include "neon.h"
#include "neon_lwlsncache.h"
#include "neon_perf_counters.h"
@@ -3564,7 +3565,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
for (int i = 0; i < nblocks; i++)
{
BlockNumber blkno = blocknum + i;
if (!BITMAP_ISSET(read, i))
if (!BITMAP_ISSET(read_pages, i))
continue;
#if PG_MAJORVERSION_NUM >= 17
@@ -3687,6 +3688,9 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
#ifndef DEBUG_COMPARE_LOCAL
/* This is a bit tricky. Check if the relation exists locally */
if (mdexists(reln, forknum))
#else
if (mdexists(reln, INIT_FORKNUM))
#endif
{
/* It exists locally. Guess it's unlogged then. */
#if PG_MAJORVERSION_NUM >= 17
@@ -3703,7 +3707,6 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
*/
return;
}
#endif
break;
case RELPERSISTENCE_PERMANENT:
@@ -3760,6 +3763,9 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
#ifndef DEBUG_COMPARE_LOCAL
/* This is a bit tricky. Check if the relation exists locally */
if (mdexists(reln, forknum))
#else
if (mdexists(reln, INIT_FORKNUM))
#endif
{
/* It exists locally. Guess it's unlogged then. */
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
@@ -3773,7 +3779,6 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
*/
return;
}
#endif
break;
case RELPERSISTENCE_PERMANENT:
@@ -4187,6 +4192,8 @@ neon_start_unlogged_build(SMgrRelation reln)
#ifndef DEBUG_COMPARE_LOCAL
if (!IsParallelWorker())
mdcreate(reln, MAIN_FORKNUM, false);
#else
mdcreate(reln, INIT_FORKNUM, false);
#endif
}
@@ -4265,6 +4272,8 @@ neon_end_unlogged_build(SMgrRelation reln)
#ifndef DEBUG_COMPARE_LOCAL
/* use isRedo == true, so that we drop it immediately */
mdunlink(rinfob, forknum, true);
#else
mdunlink(rinfob, INIT_FORKNUM, true);
#endif
}
}

View File

@@ -800,7 +800,7 @@ impl ComputeHook {
#[cfg(test)]
pub(crate) mod tests {
use pageserver_api::shard::{ShardCount, ShardNumber};
use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber};
use utils::id::TenantId;
use super::*;
@@ -808,6 +808,7 @@ pub(crate) mod tests {
#[test]
fn tenant_updates() -> anyhow::Result<()> {
let tenant_id = TenantId::generate();
let stripe_size = DEFAULT_STRIPE_SIZE;
let mut tenant_state = ComputeHookTenant::new(
TenantShardId {
tenant_id,
@@ -848,7 +849,7 @@ pub(crate) mod tests {
shard_count: ShardCount::new(2),
shard_number: ShardNumber(1),
},
stripe_size: ShardStripeSize(32768),
stripe_size,
preferred_az: None,
node_id: NodeId(1),
});
@@ -864,7 +865,7 @@ pub(crate) mod tests {
shard_count: ShardCount::new(2),
shard_number: ShardNumber(0),
},
stripe_size: ShardStripeSize(32768),
stripe_size,
preferred_az: None,
node_id: NodeId(1),
});
@@ -874,7 +875,7 @@ pub(crate) mod tests {
anyhow::bail!("Wrong send result");
};
assert_eq!(request.shards.len(), 2);
assert_eq!(request.stripe_size, Some(ShardStripeSize(32768)));
assert_eq!(request.stripe_size, Some(stripe_size));
// Simulate successful send
*guard = Some(ComputeRemoteState {

View File

@@ -44,6 +44,15 @@ pub(crate) struct StorageControllerMetricGroup {
/// Size of the in-memory map of pageserver_nodes
pub(crate) storage_controller_pageserver_nodes: measured::Gauge,
/// Count of how many pageserver nodes from in-memory map have https configured
pub(crate) storage_controller_https_pageserver_nodes: measured::Gauge,
/// Size of the in-memory map of safekeeper_nodes
pub(crate) storage_controller_safekeeper_nodes: measured::Gauge,
/// Count of how many safekeeper nodes from in-memory map have https configured
pub(crate) storage_controller_https_safekeeper_nodes: measured::Gauge,
/// Reconciler tasks completed, broken down by success/failure/cancelled
pub(crate) storage_controller_reconcile_complete:
measured::CounterVec<ReconcileCompleteLabelGroupSet>,

View File

@@ -89,6 +89,10 @@ impl Node {
self.scheduling = scheduling
}
pub(crate) fn has_https_port(&self) -> bool {
self.listen_https_port.is_some()
}
/// Does this registration request match `self`? This is used when deciding whether a registration
/// request should be allowed to update an existing record with the same node ID.
pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {

View File

@@ -89,6 +89,9 @@ impl Safekeeper {
pub(crate) fn availability(&self) -> SafekeeperState {
self.availability.clone()
}
pub(crate) fn has_https_port(&self) -> bool {
self.listen_https_port.is_some()
}
/// Perform an operation (which is given a [`SafekeeperClient`]) with retries
#[allow(clippy::too_many_arguments)]
pub(crate) async fn with_client_retries<T, O, F>(

View File

@@ -43,7 +43,7 @@ use pageserver_api::models::{
TimelineInfo, TopTenantShardItem, TopTenantShardsRequest,
};
use pageserver_api::shard::{
ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
DEFAULT_STRIPE_SIZE, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
};
use pageserver_api::upcall_api::{
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, ValidateResponse,
@@ -1509,6 +1509,10 @@ impl Service {
.metrics_group
.storage_controller_pageserver_nodes
.set(nodes.len() as i64);
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_https_pageserver_nodes
.set(nodes.values().filter(|n| n.has_https_port()).count() as i64);
tracing::info!("Loading safekeepers from database...");
let safekeepers = persistence
@@ -1526,6 +1530,14 @@ impl Service {
let safekeepers: HashMap<NodeId, Safekeeper> =
safekeepers.into_iter().map(|n| (n.get_id(), n)).collect();
tracing::info!("Loaded {} safekeepers from database.", safekeepers.len());
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_safekeeper_nodes
.set(safekeepers.len() as i64);
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_https_safekeeper_nodes
.set(safekeepers.values().filter(|s| s.has_https_port()).count() as i64);
tracing::info!("Loading shards from database...");
let mut tenant_shard_persistence = persistence.load_active_tenant_shards().await?;
@@ -2742,7 +2754,7 @@ impl Service {
count: tenant_shard_id.shard_count,
// We only import un-sharded or single-sharded tenants, so stripe
// size can be made up arbitrarily here.
stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE,
stripe_size: DEFAULT_STRIPE_SIZE,
},
placement_policy: Some(placement_policy),
config: req.config.tenant_conf,
@@ -6014,9 +6026,21 @@ impl Service {
.max()
.expect("We already validated >0 shards");
// FIXME: we have no way to recover the shard stripe size from contents of remote storage: this will
// only work if they were using the default stripe size.
let stripe_size = ShardParameters::DEFAULT_STRIPE_SIZE;
// Find the tenant's stripe size. This wasn't always persisted in the tenant manifest, so
// fall back to the original default stripe size of 32768 (256 MB) if it's not specified.
const ORIGINAL_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(32768);
let stripe_size = scan_result
.shards
.iter()
.find(|s| s.tenant_shard_id.shard_count == shard_count && s.generation == generation)
.expect("we validated >0 shards above")
.stripe_size
.unwrap_or_else(|| {
if shard_count.count() > 1 {
warn!("unknown stripe size, assuming {ORIGINAL_STRIPE_SIZE}");
}
ORIGINAL_STRIPE_SIZE
});
let (response, waiters) = self
.do_tenant_create(TenantCreateRequest {
@@ -6242,6 +6266,10 @@ impl Service {
.metrics_group
.storage_controller_pageserver_nodes
.set(locked.nodes.len() as i64);
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_https_pageserver_nodes
.set(locked.nodes.values().filter(|n| n.has_https_port()).count() as i64);
locked.scheduler.node_remove(node_id);
@@ -6333,6 +6361,10 @@ impl Service {
.metrics_group
.storage_controller_pageserver_nodes
.set(nodes.len() as i64);
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_https_pageserver_nodes
.set(nodes.values().filter(|n| n.has_https_port()).count() as i64);
}
}
@@ -6557,6 +6589,10 @@ impl Service {
.metrics_group
.storage_controller_pageserver_nodes
.set(locked.nodes.len() as i64);
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_https_pageserver_nodes
.set(locked.nodes.values().filter(|n| n.has_https_port()).count() as i64);
match registration_status {
RegistrationStatus::New => {
@@ -7270,7 +7306,7 @@ impl Service {
}
// Eventual consistency: if an earlier reconcile job failed, and the shard is still
// dirty, spawn another rone
// dirty, spawn another one
if self
.maybe_reconcile_shard(shard, &pageservers, ReconcilerPriority::Normal)
.is_some()
@@ -7829,7 +7865,7 @@ impl Service {
// old, persisted stripe size.
let new_stripe_size = match candidate.id.shard_count.count() {
0 => panic!("invalid shard count 0"),
1 => Some(ShardParameters::DEFAULT_STRIPE_SIZE),
1 => Some(DEFAULT_STRIPE_SIZE),
2.. => None,
};

View File

@@ -5,6 +5,7 @@ use std::time::Duration;
use super::safekeeper_reconciler::ScheduleRequest;
use crate::heartbeater::SafekeeperState;
use crate::metrics;
use crate::persistence::{
DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
};
@@ -590,6 +591,20 @@ impl Service {
}
}
locked.safekeepers = Arc::new(safekeepers);
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_safekeeper_nodes
.set(locked.safekeepers.len() as i64);
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_https_safekeeper_nodes
.set(
locked
.safekeepers
.values()
.filter(|s| s.has_https_port())
.count() as i64,
);
}
Ok(())
}

View File

@@ -2000,7 +2000,7 @@ pub(crate) mod tests {
use std::rc::Rc;
use pageserver_api::controller_api::NodeAvailability;
use pageserver_api::shard::{ShardCount, ShardNumber};
use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber};
use rand::SeedableRng;
use rand::rngs::StdRng;
use utils::id::TenantId;
@@ -2012,6 +2012,7 @@ pub(crate) mod tests {
let tenant_id = TenantId::generate();
let shard_number = ShardNumber(0);
let shard_count = ShardCount::new(1);
let stripe_size = DEFAULT_STRIPE_SIZE;
let tenant_shard_id = TenantShardId {
tenant_id,
@@ -2020,12 +2021,7 @@ pub(crate) mod tests {
};
TenantShard::new(
tenant_shard_id,
ShardIdentity::new(
shard_number,
shard_count,
pageserver_api::shard::ShardStripeSize(32768),
)
.unwrap(),
ShardIdentity::new(shard_number, shard_count, stripe_size).unwrap(),
policy,
None,
)
@@ -2045,6 +2041,7 @@ pub(crate) mod tests {
shard_count: ShardCount,
preferred_az: Option<AvailabilityZone>,
) -> Vec<TenantShard> {
let stripe_size = DEFAULT_STRIPE_SIZE;
(0..shard_count.count())
.map(|i| {
let shard_number = ShardNumber(i);
@@ -2056,12 +2053,7 @@ pub(crate) mod tests {
};
TenantShard::new(
tenant_shard_id,
ShardIdentity::new(
shard_number,
shard_count,
pageserver_api::shard::ShardStripeSize(32768),
)
.unwrap(),
ShardIdentity::new(shard_number, shard_count, stripe_size).unwrap(),
policy.clone(),
preferred_az.clone(),
)

View File

@@ -417,6 +417,19 @@ class NeonLocalCli(AbstractNeonCli):
cmd.append(f"--instance-id={instance_id}")
return self.raw_cli(cmd)
def object_storage_start(self, timeout_in_seconds: int | None = None):
cmd = ["object-storage", "start"]
if timeout_in_seconds is not None:
cmd.append(f"--start-timeout={timeout_in_seconds}s")
return self.raw_cli(cmd)
def object_storage_stop(self, immediate: bool):
cmd = ["object-storage", "stop"]
if immediate:
cmd.extend(["-m", "immediate"])
return self.raw_cli(cmd)
pass
def pageserver_start(
self,
id: int,

View File

@@ -1023,6 +1023,8 @@ class NeonEnvBuilder:
self.env.broker.assert_no_errors()
self.env.object_storage.assert_no_errors()
try:
self.overlay_cleanup_teardown()
except Exception as e:
@@ -1118,6 +1120,8 @@ class NeonEnv:
pagectl_env_vars["RUST_LOG"] = self.rust_log_override
self.pagectl = Pagectl(extra_env=pagectl_env_vars, binpath=self.neon_binpath)
self.object_storage = ObjectStorage(self)
# The URL for the pageserver to use as its control_plane_api config
if config.storage_controller_port_override is not None:
log.info(
@@ -1173,6 +1177,7 @@ class NeonEnv:
},
"safekeepers": [],
"pageservers": [],
"object_storage": {"port": self.port_distributor.get_port()},
"generate_local_ssl_certs": self.generate_local_ssl_certs,
}
@@ -1408,6 +1413,8 @@ class NeonEnv:
self.storage_controller.on_safekeeper_deploy(sk_id, body)
self.storage_controller.safekeeper_scheduling_policy(sk_id, "Active")
self.object_storage.start(timeout_in_seconds=timeout_in_seconds)
def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True):
"""
After this method returns, there should be no child processes running.
@@ -1425,6 +1432,8 @@ class NeonEnv:
except Exception as e:
raise_later = e
self.object_storage.stop(immediate=immediate)
# Stop storage controller before pageservers: we don't want it to spuriously
# detect a pageserver "failure" during test teardown
self.storage_controller.stop(immediate=immediate)
@@ -2635,6 +2644,26 @@ class NeonStorageController(MetricsGetter, LogUtils):
self.stop(immediate=True)
class ObjectStorage(LogUtils):
def __init__(self, env: NeonEnv):
service_dir = env.repo_dir / "object_storage"
super().__init__(logfile=service_dir / "object_storage.log")
self.conf_path = service_dir / "object_storage.json"
self.env = env
def base_url(self):
return json.loads(self.conf_path.read_text())["listen"]
def start(self, timeout_in_seconds: int | None = None):
self.env.neon_cli.object_storage_start(timeout_in_seconds)
def stop(self, immediate: bool = False):
self.env.neon_cli.object_storage_stop(immediate)
def assert_no_errors(self):
assert_no_errors(self.logfile, "object_storage", [])
class NeonProxiedStorageController(NeonStorageController):
def __init__(self, env: NeonEnv, proxy_port: int, auth_enabled: bool, use_https: bool):
super().__init__(env, proxy_port, auth_enabled, use_https)

View File

@@ -808,9 +808,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.38.0"
version = "1.38.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a"
checksum = "68722da18b0fc4a05fdc1120b302b82051265792a1e1b399086e9b204b10ad3d"
dependencies = [
"backtrace",
"bytes",

View File

@@ -148,9 +148,9 @@ def test_create_snapshot(
env = neon_env_builder.init_start(
initial_tenant_conf={
# Miniature layers to enable generating non-trivial layer map without writing lots of data.
"checkpoint_distance": f"{128 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{128 * 1024}",
"checkpoint_distance": f"{256 * 1024}",
"compaction_threshold": "5",
"compaction_target_size": f"{256 * 1024}",
}
)
endpoint = env.endpoints.create_start("main")
@@ -492,6 +492,13 @@ HISTORIC_DATA_SETS = [
PgVersion.V17,
"https://neon-github-public-dev.s3.eu-central-1.amazonaws.com/compatibility-data-snapshots/2025-02-07-pgv17-nogenerations.tar.zst",
),
# Tenant manifest v1.
HistoricDataSet(
"2025-04-08-tenant-manifest-v1",
TenantId("c547c28588abf1d7b7139ff1f1158345"),
PgVersion.V17,
"https://neon-github-public-dev.s3.eu-central-1.amazonaws.com/compatibility-data-snapshots/2025-04-08-pgv17-tenant-manifest-v1.tar.zst",
),
]

View File

@@ -49,6 +49,8 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon")
def get_lfc_size() -> tuple[int, int]:
lfc_file_path = endpoint.lfc_path()
lfc_file_size = lfc_file_path.stat().st_size
@@ -103,3 +105,23 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
time.sleep(1)
assert int(lfc_file_blocks) <= 128 * 1024
# Now test that number of rows returned by local_cache is the same as file_cache_used_pages.
# Perform several iterations to make cache cache content stabilized.
nretries = 10
while True:
cur.execute("select count(*) from local_cache")
local_cache_size = cur.fetchall()[0][0]
cur.execute(
"select lfc_value::bigint FROM neon_lfc_stats where lfc_key='file_cache_used_pages'"
)
used_pages = cur.fetchall()[0][0]
if local_cache_size == used_pages or nretries == 0:
break
nretries = nretries - 1
time.sleep(1)
assert local_cache_size == used_pages

View File

@@ -134,10 +134,11 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder):
"""
env = neon_env_builder.init_start()
# Stop default ps/sk
# Stop default services
env.neon_cli.pageserver_stop(env.pageserver.id)
env.neon_cli.safekeeper_stop()
env.neon_cli.storage_controller_stop(False)
env.neon_cli.object_storage_stop(False)
env.neon_cli.storage_broker_stop()
# Keep NeonEnv state up to date, it usually owns starting/stopping services
@@ -179,11 +180,13 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder):
# Using the single-pageserver shortcut property throws when there are multiple pageservers
with pytest.raises(AssertionError):
_drop = env.pageserver
_ = env.pageserver
env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 1)
env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 2)
env.neon_cli.object_storage_stop(False)
# Stop this to get out of the way of the following `start`
env.neon_cli.storage_controller_stop(False)
env.neon_cli.storage_broker_stop()

View File

@@ -0,0 +1,56 @@
from time import time
import pytest
from aiohttp import ClientSession
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from jwcrypto import jwk, jwt
@pytest.mark.asyncio
async def test_object_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
"""
Inserts, retrieves, and deletes test file using a JWT token
"""
env = neon_simple_env
ep = env.endpoints.create_start(branch_name="main")
tenant_id = str(ep.tenant_id)
timeline_id = str(ep.show_timeline_id())
endpoint_id = ep.endpoint_id
key_path = env.repo_dir / "auth_private_key.pem"
key = jwk.JWK.from_pem(key_path.read_bytes())
claims = {
"tenant_id": tenant_id,
"timeline_id": timeline_id,
"endpoint_id": endpoint_id,
"exp": round(time()) + 99,
}
log.info(f"key path {key_path}\nclaims {claims}")
token = jwt.JWT(header={"alg": "EdDSA"}, claims=claims)
token.make_signed_token(key)
token = token.serialize()
base_url = env.object_storage.base_url()
key = f"http://{base_url}/{tenant_id}/{timeline_id}/{endpoint_id}/key"
headers = {"Authorization": f"Bearer {token}"}
log.info(f"cache key url {key}")
log.info(f"token {token}")
async with ClientSession(headers=headers) as session:
async with session.get(key) as res:
assert res.status == 404, f"Non-existing file is present: {res}"
data = b"cheburash"
async with session.put(key, data=data) as res:
assert res.status == 200, f"Error writing file: {res}"
async with session.get(key) as res:
read_data = await res.read()
assert data == read_data
async with session.delete(key) as res:
assert res.status == 200, f"Error removing file {res}"
async with session.get(key) as res:
assert res.status == 404, f"File was not deleted: {res}"

View File

@@ -95,6 +95,7 @@ def test_storage_controller_smoke(
env.pageservers[1].start()
for sk in env.safekeepers:
sk.start()
env.object_storage.start()
# The pageservers we started should have registered with the sharding service on startup
nodes = env.storage_controller.node_list()
@@ -346,6 +347,7 @@ def prepare_onboarding_env(
env = neon_env_builder.init_configs()
env.broker.start()
env.storage_controller.start()
env.object_storage.start()
# This is the pageserver where we'll initially create the tenant. Run it in emergency
# mode so that it doesn't talk to storage controller, and do not register it.
@@ -675,7 +677,7 @@ def test_storage_controller_compute_hook(
env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=2)
expect = {
"tenant_id": str(env.initial_tenant),
"stripe_size": 32768,
"stripe_size": 2048,
"shards": [
{"node_id": int(env.pageservers[1].id), "shard_number": 0},
{"node_id": int(env.pageservers[1].id), "shard_number": 1},
@@ -4109,6 +4111,7 @@ def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart
env.storage_controller.allowed_errors.extend(
[
".*Call to safekeeper.* management API still failed after.*",
".*Call to safekeeper.* management API failed, will retry.*",
".*reconcile_one.*tenant_id={tenant_id}.*Call to safekeeper.* management API still failed after.*",
]
)

View File

@@ -75,7 +75,7 @@ def test_scrubber_tenant_snapshot(neon_env_builder: NeonEnvBuilder, shard_count:
tenant_shard_ids = [TenantShardId(tenant_id, 0, 0)]
# Let shards finish rescheduling to other pageservers: this makes the rest of the test more stable
# is it won't overlap with migrations
# as it won't overlap with migrations
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
output_path = neon_env_builder.test_output_dir / "snapshot"
@@ -87,6 +87,13 @@ def test_scrubber_tenant_snapshot(neon_env_builder: NeonEnvBuilder, shard_count:
workload.stop()
# Disable scheduling, so the storage controller doesn't migrate shards around
# while we are stopping pageservers
env.storage_controller.tenant_policy_update(tenant_id, {"scheduling": "Stop"})
env.storage_controller.allowed_errors.extend(
[".*Scheduling is disabled by policy Stop.*", ".*Skipping reconcile for policy Stop.*"]
)
# Stop pageservers
for pageserver in env.pageservers:
pageserver.stop()
@@ -127,9 +134,16 @@ def test_scrubber_tenant_snapshot(neon_env_builder: NeonEnvBuilder, shard_count:
for pageserver in env.pageservers:
pageserver.start()
# Turn scheduling back on.
# We don't care about optimizations, so enable only essential scheduling
env.storage_controller.tenant_policy_update(tenant_id, {"scheduling": "Essential"})
# Check we can read everything
workload.validate()
# Reconcile to avoid a race between test shutdown and background reconciliation (#11278)
env.storage_controller.reconcile_until_idle()
def drop_local_state(env: NeonEnv, tenant_id: TenantId):
env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"})

View File

@@ -318,7 +318,7 @@ def test_timeline_offload_persist(neon_env_builder: NeonEnvBuilder, delete_timel
neon_env_builder.pageserver_remote_storage,
prefix=f"tenants/{str(tenant_id)}/",
)
assert_prefix_empty(
assert_prefix_not_empty(
neon_env_builder.pageserver_remote_storage,
prefix=f"tenants/{str(tenant_id)}/tenant-manifest",
)
@@ -387,7 +387,7 @@ def test_timeline_offload_persist(neon_env_builder: NeonEnvBuilder, delete_timel
sum_again = endpoint.safe_psql("SELECT sum(key) from foo where key < 500")
assert sum == sum_again
assert_prefix_empty(
assert_prefix_not_empty(
neon_env_builder.pageserver_remote_storage,
prefix=f"tenants/{str(env.initial_tenant)}/tenant-manifest",
)
@@ -924,7 +924,7 @@ def test_timeline_offload_generations(neon_env_builder: NeonEnvBuilder):
neon_env_builder.pageserver_remote_storage,
prefix=f"tenants/{str(tenant_id)}/",
)
assert_prefix_empty(
assert_prefix_not_empty(
neon_env_builder.pageserver_remote_storage,
prefix=f"tenants/{str(tenant_id)}/tenant-manifest",
)