mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-31 09:10:38 +00:00
Compare commits
94 Commits
skyzh/feat
...
jcsp/gener
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c63a952b78 | ||
|
|
35e4b43531 | ||
|
|
584c0d3c7b | ||
|
|
84023207ce | ||
|
|
35fa75699b | ||
|
|
f77aa463c6 | ||
|
|
4492d40c37 | ||
|
|
2f58f39648 | ||
|
|
10b85c0d9a | ||
|
|
cd6367b5ae | ||
|
|
79f9f7c5f8 | ||
|
|
ef5ce1635c | ||
|
|
5aecd8c4fd | ||
|
|
5266bf4552 | ||
|
|
a1bcad2382 | ||
|
|
4dd60bf7cd | ||
|
|
3eff65618d | ||
|
|
265d3b4352 | ||
|
|
000330054b | ||
|
|
ddb6453f56 | ||
|
|
bc95b8f1f5 | ||
|
|
5b7d3e39d6 | ||
|
|
034bebcfcd | ||
|
|
9e0e2a2a9a | ||
|
|
34160a15ca | ||
|
|
f3a9c2d788 | ||
|
|
50da1b7983 | ||
|
|
4a0e2d1290 | ||
|
|
980d3ba8b0 | ||
|
|
fd836d8c45 | ||
|
|
67b17034ab | ||
|
|
930de712ee | ||
|
|
dd033d9138 | ||
|
|
5a217791fd | ||
|
|
c9a007d05b | ||
|
|
696b49eeba | ||
|
|
206420d96a | ||
|
|
416026381f | ||
|
|
d9755becab | ||
|
|
9cb255be97 | ||
|
|
57a44dcc01 | ||
|
|
1afc6337fb | ||
|
|
74058e196a | ||
|
|
a116f6656f | ||
|
|
2c7b97245a | ||
|
|
6efddbf526 | ||
|
|
7c4d79f4db | ||
|
|
8c2ff87f1a | ||
|
|
23fc247a03 | ||
|
|
d8dc4425f8 | ||
|
|
18159b7695 | ||
|
|
c1bc9c0f70 | ||
|
|
2de5efa208 | ||
|
|
d330eac4bc | ||
|
|
3ebceeda71 | ||
|
|
31729d6f4d | ||
|
|
7e0e3517c1 | ||
|
|
c4fc6e433d | ||
|
|
c36cba28d6 | ||
|
|
8eaa4015de | ||
|
|
10e927ee3e | ||
|
|
bb3a59f275 | ||
|
|
a0ed43cc12 | ||
|
|
99dc5a5c27 | ||
|
|
54db1f5d8a | ||
|
|
404b25e45f | ||
|
|
f4dba9f907 | ||
|
|
4ec45bc7dc | ||
|
|
a00d4a8d8c | ||
|
|
43c9a09d8f | ||
|
|
3edd7ece40 | ||
|
|
504fe9c2b0 | ||
|
|
10df237a81 | ||
|
|
d40f8475a5 | ||
|
|
164f916a40 | ||
|
|
4ebc29768c | ||
|
|
bae62916dc | ||
|
|
5e2b8b376c | ||
|
|
54ec7919b8 | ||
|
|
e0bed0732c | ||
|
|
9e92121cc3 | ||
|
|
50a9508f4f | ||
|
|
f61402be24 | ||
|
|
975e4f2235 | ||
|
|
537eca489e | ||
|
|
de4882886e | ||
|
|
6982288426 | ||
|
|
ccfcfa1098 | ||
|
|
e2c793c897 | ||
|
|
0fdc492aa4 | ||
|
|
787b099541 | ||
|
|
3af693749d | ||
|
|
6f9ae6bb5f | ||
|
|
16d77dcb73 |
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -1001,6 +1001,7 @@ dependencies = [
|
||||
"comfy-table",
|
||||
"compute_api",
|
||||
"git-version",
|
||||
"hyper",
|
||||
"nix 0.26.2",
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
@@ -1016,6 +1017,7 @@ dependencies = [
|
||||
"storage_broker",
|
||||
"tar",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"toml",
|
||||
"tracing",
|
||||
"url",
|
||||
@@ -2684,6 +2686,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"const_format",
|
||||
"enum-map",
|
||||
"hex",
|
||||
"postgres_ffi",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
||||
@@ -12,6 +12,7 @@ git-version.workspace = true
|
||||
nix.workspace = true
|
||||
once_cell.workspace = true
|
||||
postgres.workspace = true
|
||||
hyper.workspace = true
|
||||
regex.workspace = true
|
||||
reqwest = { workspace = true, features = ["blocking", "json"] }
|
||||
serde.workspace = true
|
||||
@@ -20,6 +21,7 @@ serde_with.workspace = true
|
||||
tar.workspace = true
|
||||
thiserror.workspace = true
|
||||
toml.workspace = true
|
||||
tokio.workspace = true
|
||||
url.workspace = true
|
||||
# Note: Do not directly depend on pageserver or safekeeper; use pageserver_api or safekeeper_api
|
||||
# instead, so that recompile times are better.
|
||||
|
||||
104
control_plane/src/attachment_service.rs
Normal file
104
control_plane/src/attachment_service.rs
Normal file
@@ -0,0 +1,104 @@
|
||||
use crate::{background_process, local_env::LocalEnv};
|
||||
use anyhow::anyhow;
|
||||
use pageserver_api::control_api::HexTenantId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{path::PathBuf, process::Child};
|
||||
use utils::id::{NodeId, TenantId};
|
||||
|
||||
pub struct AttachmentService {
|
||||
env: LocalEnv,
|
||||
listen: String,
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
const COMMAND: &str = "attachment_service";
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct AttachHookRequest {
|
||||
pub tenant_id: HexTenantId,
|
||||
pub pageserver_id: Option<NodeId>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct AttachHookResponse {
|
||||
pub gen: Option<u32>,
|
||||
}
|
||||
|
||||
impl AttachmentService {
|
||||
pub fn from_env(env: &LocalEnv) -> Self {
|
||||
let path = env.base_data_dir.join("attachments.json");
|
||||
|
||||
// Makes no sense to construct this if pageservers aren't going to use it: assume
|
||||
// pageservers have control plane API set
|
||||
let listen_url = env.pageserver.control_plane_api.clone().unwrap();
|
||||
|
||||
let listen = format!(
|
||||
"{}:{}",
|
||||
listen_url.host_str().unwrap(),
|
||||
listen_url.port().unwrap()
|
||||
);
|
||||
|
||||
Self {
|
||||
env: env.clone(),
|
||||
path,
|
||||
listen,
|
||||
}
|
||||
}
|
||||
|
||||
fn pid_file(&self) -> PathBuf {
|
||||
self.env.base_data_dir.join("attachment_service.pid")
|
||||
}
|
||||
|
||||
pub fn start(&self) -> anyhow::Result<Child> {
|
||||
let path_str = self.path.to_string_lossy();
|
||||
|
||||
background_process::start_process(
|
||||
COMMAND,
|
||||
&self.env.base_data_dir,
|
||||
&self.env.attachment_service_bin(),
|
||||
["-l", &self.listen, "-p", &path_str],
|
||||
[],
|
||||
background_process::InitialPidFile::Create(&self.pid_file()),
|
||||
// TODO: a real status check
|
||||
|| Ok(true),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
|
||||
background_process::stop_process(immediate, COMMAND, &self.pid_file())
|
||||
}
|
||||
|
||||
/// Call into the attach_hook API, for use before handing out attachments to pageservers
|
||||
pub fn attach_hook(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
pageserver_id: NodeId,
|
||||
) -> anyhow::Result<Option<u32>> {
|
||||
use hyper::StatusCode;
|
||||
|
||||
let url = self
|
||||
.env
|
||||
.pageserver
|
||||
.control_plane_api
|
||||
.clone()
|
||||
.unwrap()
|
||||
.join("attach_hook")
|
||||
.unwrap();
|
||||
let client = reqwest::blocking::ClientBuilder::new()
|
||||
.build()
|
||||
.expect("Failed to construct http client");
|
||||
|
||||
let request = AttachHookRequest {
|
||||
tenant_id: HexTenantId::new(tenant_id),
|
||||
pageserver_id: Some(pageserver_id),
|
||||
};
|
||||
|
||||
let response = client.post(url).json(&request).send()?;
|
||||
if response.status() != StatusCode::OK {
|
||||
return Err(anyhow!("Unexpected status {0}", response.status()));
|
||||
}
|
||||
|
||||
let response = response.json::<AttachHookResponse>()?;
|
||||
Ok(response.gen)
|
||||
}
|
||||
}
|
||||
264
control_plane/src/bin/attachment_service.rs
Normal file
264
control_plane/src/bin/attachment_service.rs
Normal file
@@ -0,0 +1,264 @@
|
||||
/// The attachment service mimics the aspects of the control plane API
|
||||
/// that are required for a pageserver to operate.
|
||||
///
|
||||
/// This enables running & testing pageservers without a full-blown
|
||||
/// deployment of the Neon cloud platform.
|
||||
///
|
||||
use anyhow::anyhow;
|
||||
use clap::Parser;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response};
|
||||
use pageserver_api::control_api::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use utils::logging::{self, LogFormat};
|
||||
|
||||
use utils::{
|
||||
http::{
|
||||
endpoint::{self},
|
||||
error::ApiError,
|
||||
json::{json_request, json_response},
|
||||
RequestExt, RouterBuilder,
|
||||
},
|
||||
id::{NodeId, TenantId},
|
||||
tcp_listener,
|
||||
};
|
||||
|
||||
use control_plane::attachment_service::{AttachHookRequest, AttachHookResponse};
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
#[command(arg_required_else_help(true))]
|
||||
struct Cli {
|
||||
#[arg(short, long)]
|
||||
listen: String,
|
||||
|
||||
#[arg(short, long)]
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
// The persistent state of each Tenant
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
struct TenantState {
|
||||
// Currently attached pageserver
|
||||
pageserver: Option<NodeId>,
|
||||
|
||||
// Latest generation number: next time we attach, increment this
|
||||
// and use the incremented number when attaching
|
||||
generation: u32,
|
||||
}
|
||||
|
||||
fn to_hex_map<S, V>(input: &HashMap<TenantId, V>, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
V: Clone + Serialize,
|
||||
{
|
||||
eprintln!("to_hex_map");
|
||||
let transformed = input
|
||||
.iter()
|
||||
.map(|(k, v)| (HexTenantId::new(k.clone()), v.clone()));
|
||||
|
||||
transformed
|
||||
.collect::<HashMap<HexTenantId, V>>()
|
||||
.serialize(serializer)
|
||||
}
|
||||
|
||||
fn from_hex_map<'de, D, V>(deserializer: D) -> Result<HashMap<TenantId, V>, D::Error>
|
||||
where
|
||||
D: serde::de::Deserializer<'de>,
|
||||
V: Deserialize<'de>,
|
||||
{
|
||||
eprintln!("from_hex_map");
|
||||
let hex_map = HashMap::<HexTenantId, V>::deserialize(deserializer)?;
|
||||
|
||||
Ok(hex_map.into_iter().map(|(k, v)| (k.take(), v)).collect())
|
||||
}
|
||||
|
||||
// Top level state available to all HTTP handlers
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct PersistentState {
|
||||
#[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")]
|
||||
tenants: HashMap<TenantId, TenantState>,
|
||||
|
||||
#[serde(skip)]
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl PersistentState {
|
||||
async fn save(&self) -> anyhow::Result<()> {
|
||||
let bytes = serde_json::to_vec(self)?;
|
||||
tokio::fs::write(&self.path, &bytes).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn load(path: &Path) -> anyhow::Result<Self> {
|
||||
let bytes = tokio::fs::read(path).await?;
|
||||
let mut decoded = serde_json::from_slice::<Self>(&bytes)?;
|
||||
decoded.path = path.to_owned();
|
||||
Ok(decoded)
|
||||
}
|
||||
|
||||
async fn load_or_new(path: &Path) -> Self {
|
||||
match Self::load(path).await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
tracing::info!(
|
||||
"Creating new state file at {0} (load returned {e})",
|
||||
path.to_string_lossy()
|
||||
);
|
||||
Self {
|
||||
tenants: HashMap::new(),
|
||||
path: path.to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// State available to HTTP request handlers
|
||||
#[derive(Clone)]
|
||||
struct State {
|
||||
inner: Arc<tokio::sync::RwLock<PersistentState>>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
fn new(persistent_state: PersistentState) -> State {
|
||||
Self {
|
||||
inner: Arc::new(tokio::sync::RwLock::new(persistent_state)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn get_state(request: &Request<Body>) -> &State {
|
||||
request
|
||||
.data::<Arc<State>>()
|
||||
.expect("unknown state type")
|
||||
.as_ref()
|
||||
}
|
||||
|
||||
/// Pageserver calls into this on startup, to learn which tenants it should attach
|
||||
async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
|
||||
|
||||
let state = get_state(&req).inner.clone();
|
||||
let mut locked = state.write().await;
|
||||
|
||||
let mut response = ReAttachResponse {
|
||||
tenants: Vec::new(),
|
||||
};
|
||||
for (t, state) in &mut locked.tenants {
|
||||
if state.pageserver == Some(reattach_req.node_id) {
|
||||
state.generation += 1;
|
||||
response.tenants.push(ReAttachResponseTenant {
|
||||
id: HexTenantId::new(t.clone()),
|
||||
generation: state.generation,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
locked
|
||||
.save()
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e))?;
|
||||
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
/// Pageserver calls into this before doing deletions, to confirm that it still
|
||||
/// holds the latest generation for the tenants with deletions enqueued
|
||||
async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let validate_req = json_request::<ValidateRequest>(&mut req).await?;
|
||||
|
||||
let state = get_state(&req).inner.clone();
|
||||
let locked = state.read().await;
|
||||
|
||||
let mut response = ValidateResponse {
|
||||
tenants: Vec::new(),
|
||||
};
|
||||
|
||||
for req_tenant in validate_req.tenants {
|
||||
if let Some(tenant_state) = locked.tenants.get(req_tenant.id.as_ref()) {
|
||||
let valid = tenant_state.generation == req_tenant.gen;
|
||||
response.tenants.push(ValidateResponseTenant {
|
||||
id: req_tenant.id,
|
||||
valid,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
/// Call into this before attaching a tenant to a pageserver, to acquire a generation number
|
||||
/// (in the real control plane this is unnecessary, because the same program is managing
|
||||
/// generation numbers and doing attachments).
|
||||
async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
|
||||
|
||||
let state = get_state(&req).inner.clone();
|
||||
let mut locked = state.write().await;
|
||||
|
||||
let tenant_state = locked
|
||||
.tenants
|
||||
.entry(attach_req.tenant_id.take())
|
||||
.or_insert_with(|| TenantState {
|
||||
pageserver: attach_req.pageserver_id,
|
||||
generation: 0,
|
||||
});
|
||||
|
||||
if attach_req.pageserver_id.is_some() {
|
||||
tenant_state.generation += 1;
|
||||
}
|
||||
let generation = tenant_state.generation;
|
||||
|
||||
locked
|
||||
.save()
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e))?;
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
AttachHookResponse {
|
||||
gen: attach_req.pageserver_id.map(|_| generation),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
endpoint::make_router()
|
||||
.data(Arc::new(State::new(persistent_state)))
|
||||
.post("/re-attach", |r| handle_re_attach(r))
|
||||
.post("/validate", |r| handle_validate(r))
|
||||
.post("/attach_hook", |r| handle_attach_hook(r))
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
logging::init(
|
||||
LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
)?;
|
||||
|
||||
let args = Cli::parse();
|
||||
tracing::info!(
|
||||
"Starting, state at {}, listening on {}",
|
||||
args.path.to_string_lossy(),
|
||||
args.listen
|
||||
);
|
||||
|
||||
let persistent_state = PersistentState::load_or_new(&args.path).await;
|
||||
|
||||
let http_listener = tcp_listener::bind(&args.listen)?;
|
||||
let router = make_router(persistent_state)
|
||||
.build()
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
let service = utils::http::RouterService::new(router).unwrap();
|
||||
let server = hyper::Server::from_tcp(http_listener)?.serve(service);
|
||||
|
||||
tracing::info!("Serving on {0}", args.listen.as_str());
|
||||
server.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -8,6 +8,7 @@
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
|
||||
use compute_api::spec::ComputeMode;
|
||||
use control_plane::attachment_service::AttachmentService;
|
||||
use control_plane::endpoint::ComputeControlPlane;
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use control_plane::pageserver::PageServerNode;
|
||||
@@ -43,6 +44,8 @@ project_git_version!(GIT_VERSION);
|
||||
|
||||
const DEFAULT_PG_VERSION: &str = "15";
|
||||
|
||||
const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/";
|
||||
|
||||
fn default_conf() -> String {
|
||||
format!(
|
||||
r#"
|
||||
@@ -56,11 +59,13 @@ listen_pg_addr = '{DEFAULT_PAGESERVER_PG_ADDR}'
|
||||
listen_http_addr = '{DEFAULT_PAGESERVER_HTTP_ADDR}'
|
||||
pg_auth_type = '{trust_auth}'
|
||||
http_auth_type = '{trust_auth}'
|
||||
control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}'
|
||||
|
||||
[[safekeepers]]
|
||||
id = {DEFAULT_SAFEKEEPER_ID}
|
||||
pg_port = {DEFAULT_SAFEKEEPER_PG_PORT}
|
||||
http_port = {DEFAULT_SAFEKEEPER_HTTP_PORT}
|
||||
|
||||
"#,
|
||||
trust_auth = AuthType::Trust,
|
||||
)
|
||||
@@ -107,6 +112,7 @@ fn main() -> Result<()> {
|
||||
"start" => handle_start_all(sub_args, &env),
|
||||
"stop" => handle_stop_all(sub_args, &env),
|
||||
"pageserver" => handle_pageserver(sub_args, &env),
|
||||
"attachment_service" => handle_attachment_service(sub_args, &env),
|
||||
"safekeeper" => handle_safekeeper(sub_args, &env),
|
||||
"endpoint" => handle_endpoint(sub_args, &env),
|
||||
"pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
|
||||
@@ -342,13 +348,25 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
|
||||
}
|
||||
}
|
||||
Some(("create", create_match)) => {
|
||||
let initial_tenant_id = parse_tenant_id(create_match)?;
|
||||
let tenant_conf: HashMap<_, _> = create_match
|
||||
.get_many::<String>("config")
|
||||
.map(|vals| vals.flat_map(|c| c.split_once(':')).collect())
|
||||
.unwrap_or_default();
|
||||
let new_tenant_id = pageserver.tenant_create(initial_tenant_id, tenant_conf)?;
|
||||
println!("tenant {new_tenant_id} successfully created on the pageserver");
|
||||
|
||||
// If tenant ID was not specified, generate one
|
||||
let tenant_id = parse_tenant_id(create_match)?.unwrap_or(TenantId::generate());
|
||||
|
||||
let generation = if env.pageserver.control_plane_api.is_some() {
|
||||
// We must register the tenant with the attachment service, so
|
||||
// that when the pageserver restarts, it will be re-attached.
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
attachment_service.attach_hook(tenant_id, env.pageserver.id)?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
pageserver.tenant_create(tenant_id, generation, tenant_conf)?;
|
||||
println!("tenant {tenant_id} successfully created on the pageserver");
|
||||
|
||||
// Create an initial timeline for the new tenant
|
||||
let new_timeline_id = parse_timeline_id(create_match)?;
|
||||
@@ -358,7 +376,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
|
||||
.context("Failed to parse postgres version from the argument string")?;
|
||||
|
||||
let timeline_info = pageserver.timeline_create(
|
||||
new_tenant_id,
|
||||
tenant_id,
|
||||
new_timeline_id,
|
||||
None,
|
||||
None,
|
||||
@@ -369,17 +387,17 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
|
||||
|
||||
env.register_branch_mapping(
|
||||
DEFAULT_BRANCH_NAME.to_string(),
|
||||
new_tenant_id,
|
||||
tenant_id,
|
||||
new_timeline_id,
|
||||
)?;
|
||||
|
||||
println!(
|
||||
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {new_tenant_id}",
|
||||
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
|
||||
);
|
||||
|
||||
if create_match.get_flag("set-default") {
|
||||
println!("Setting tenant {new_tenant_id} as a default one");
|
||||
env.default_tenant_id = Some(new_tenant_id);
|
||||
println!("Setting tenant {tenant_id} as a default one");
|
||||
env.default_tenant_id = Some(tenant_id);
|
||||
}
|
||||
}
|
||||
Some(("set-default", set_default_match)) => {
|
||||
@@ -817,6 +835,33 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_attachment_service(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
let svc = AttachmentService::from_env(env);
|
||||
match sub_match.subcommand() {
|
||||
Some(("start", _start_match)) => {
|
||||
if let Err(e) = svc.start() {
|
||||
eprintln!("start failed: {e}");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
Some(("stop", stop_match)) => {
|
||||
let immediate = stop_match
|
||||
.get_one::<String>("stop-mode")
|
||||
.map(|s| s.as_str())
|
||||
== Some("immediate");
|
||||
|
||||
if let Err(e) = svc.stop(immediate) {
|
||||
eprintln!("stop failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
Some((sub_name, _)) => bail!("Unexpected attachment_service subcommand '{}'", sub_name),
|
||||
None => bail!("no attachment_service subcommand provided"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result<SafekeeperNode> {
|
||||
if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
|
||||
Ok(SafekeeperNode::from_env(env, node))
|
||||
@@ -897,6 +942,16 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow
|
||||
|
||||
broker::start_broker_process(env)?;
|
||||
|
||||
// Only start the attachment service if the pageserver is configured to need it
|
||||
if env.pageserver.control_plane_api.is_some() {
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
if let Err(e) = attachment_service.start() {
|
||||
eprintln!("attachment_service start failed: {:#}", e);
|
||||
try_stop_all(env, true);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
let pageserver = PageServerNode::from_env(env);
|
||||
if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) {
|
||||
eprintln!("pageserver {} start failed: {:#}", env.pageserver.id, e);
|
||||
@@ -955,6 +1010,13 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
|
||||
if let Err(e) = broker::stop_broker_process(env) {
|
||||
eprintln!("neon broker stop failed: {e:#}");
|
||||
}
|
||||
|
||||
if env.pageserver.control_plane_api.is_some() {
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
if let Err(e) = attachment_service.stop(immediate) {
|
||||
eprintln!("attachment service stop failed: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn cli() -> Command {
|
||||
@@ -1138,6 +1200,14 @@ fn cli() -> Command {
|
||||
.arg(stop_mode_arg.clone()))
|
||||
.subcommand(Command::new("restart").about("Restart local pageserver").arg(pageserver_config_args.clone()))
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("attachment_service")
|
||||
.arg_required_else_help(true)
|
||||
.about("Manage attachment_service")
|
||||
.subcommand(Command::new("start").about("Start local pageserver").arg(pageserver_config_args.clone()))
|
||||
.subcommand(Command::new("stop").about("Stop local pageserver")
|
||||
.arg(stop_mode_arg.clone()))
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("safekeeper")
|
||||
.arg_required_else_help(true)
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
// local installations.
|
||||
//
|
||||
|
||||
pub mod attachment_service;
|
||||
mod background_process;
|
||||
pub mod broker;
|
||||
pub mod endpoint;
|
||||
|
||||
@@ -118,6 +118,9 @@ pub struct PageServerConf {
|
||||
// auth type used for the PG and HTTP ports
|
||||
pub pg_auth_type: AuthType,
|
||||
pub http_auth_type: AuthType,
|
||||
|
||||
// Control plane location
|
||||
pub control_plane_api: Option<Url>,
|
||||
}
|
||||
|
||||
impl Default for PageServerConf {
|
||||
@@ -128,6 +131,7 @@ impl Default for PageServerConf {
|
||||
listen_http_addr: String::new(),
|
||||
pg_auth_type: AuthType::Trust,
|
||||
http_auth_type: AuthType::Trust,
|
||||
control_plane_api: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -202,6 +206,10 @@ impl LocalEnv {
|
||||
self.neon_distrib_dir.join("pageserver")
|
||||
}
|
||||
|
||||
pub fn attachment_service_bin(&self) -> PathBuf {
|
||||
self.neon_distrib_dir.join("attachment_service")
|
||||
}
|
||||
|
||||
pub fn safekeeper_bin(&self) -> PathBuf {
|
||||
self.neon_distrib_dir.join("safekeeper")
|
||||
}
|
||||
|
||||
@@ -126,6 +126,13 @@ impl PageServerNode {
|
||||
broker_endpoint_param,
|
||||
];
|
||||
|
||||
if let Some(control_plane_api) = &self.env.pageserver.control_plane_api {
|
||||
overrides.push(format!(
|
||||
"control_plane_api='{}'",
|
||||
control_plane_api.as_str()
|
||||
));
|
||||
}
|
||||
|
||||
if self.env.pageserver.http_auth_type != AuthType::Trust
|
||||
|| self.env.pageserver.pg_auth_type != AuthType::Trust
|
||||
{
|
||||
@@ -316,7 +323,8 @@ impl PageServerNode {
|
||||
|
||||
pub fn tenant_create(
|
||||
&self,
|
||||
new_tenant_id: Option<TenantId>,
|
||||
new_tenant_id: TenantId,
|
||||
generation: Option<u32>,
|
||||
settings: HashMap<&str, &str>,
|
||||
) -> anyhow::Result<TenantId> {
|
||||
let mut settings = settings.clone();
|
||||
@@ -382,11 +390,9 @@ impl PageServerNode {
|
||||
.context("Failed to parse 'gc_feedback' as bool")?,
|
||||
};
|
||||
|
||||
// If tenant ID was not specified, generate one
|
||||
let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate());
|
||||
|
||||
let request = models::TenantCreateRequest {
|
||||
new_tenant_id,
|
||||
generation,
|
||||
config,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
|
||||
@@ -12,6 +12,7 @@ const_format.workspace = true
|
||||
anyhow.workspace = true
|
||||
bytes.workspace = true
|
||||
byteorder.workspace = true
|
||||
hex.workspace = true
|
||||
utils.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
enum-map.workspace = true
|
||||
|
||||
89
libs/pageserver_api/src/control_api.rs
Normal file
89
libs/pageserver_api/src/control_api.rs
Normal file
@@ -0,0 +1,89 @@
|
||||
/// Types in this file are for pageserver's upward-facing API calls to the control plane
|
||||
use hex::FromHex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::{NodeId, TenantId};
|
||||
|
||||
/// TenantId's serialization is an array of u8, which is rather unfriendly
|
||||
/// for outside callers who aren't working with the native Rust TenantId.
|
||||
/// This class wraps it in serialization that is just the hex strict
|
||||
/// representation.
|
||||
#[derive(Eq, PartialEq, Clone, Hash)]
|
||||
pub struct HexTenantId(TenantId);
|
||||
|
||||
impl HexTenantId {
|
||||
pub fn new(t: TenantId) -> Self {
|
||||
Self(t)
|
||||
}
|
||||
|
||||
pub fn take(self) -> TenantId {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<TenantId> for HexTenantId {
|
||||
fn as_ref(&self) -> &TenantId {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for HexTenantId {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
let hex = self.0.hex_encode();
|
||||
serializer.collect_str(&hex)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for HexTenantId {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let string = String::deserialize(deserializer)?;
|
||||
TenantId::from_hex(string)
|
||||
.map(|t| HexTenantId::new(t))
|
||||
.map_err(|e| serde::de::Error::custom(format!("{e}")))
|
||||
}
|
||||
}
|
||||
|
||||
// Top level s
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ReAttachRequest {
|
||||
pub node_id: NodeId,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ReAttachResponseTenant {
|
||||
pub id: HexTenantId,
|
||||
pub generation: u32,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ReAttachResponse {
|
||||
pub tenants: Vec<ReAttachResponseTenant>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ValidateRequestTenant {
|
||||
pub id: HexTenantId,
|
||||
pub gen: u32,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ValidateRequest {
|
||||
pub tenants: Vec<ValidateRequestTenant>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ValidateResponse {
|
||||
pub tenants: Vec<ValidateResponseTenant>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ValidateResponseTenant {
|
||||
pub id: HexTenantId,
|
||||
pub valid: bool,
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
use const_format::formatcp;
|
||||
|
||||
/// Public API types
|
||||
pub mod control_api;
|
||||
pub mod models;
|
||||
pub mod reltag;
|
||||
|
||||
|
||||
@@ -194,6 +194,9 @@ pub struct TimelineCreateRequest {
|
||||
pub struct TenantCreateRequest {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub new_tenant_id: TenantId,
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generation: Option<u32>,
|
||||
#[serde(flatten)]
|
||||
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
|
||||
}
|
||||
@@ -241,15 +244,6 @@ pub struct StatusResponse {
|
||||
pub id: NodeId,
|
||||
}
|
||||
|
||||
impl TenantCreateRequest {
|
||||
pub fn new(new_tenant_id: TenantId) -> TenantCreateRequest {
|
||||
TenantCreateRequest {
|
||||
new_tenant_id,
|
||||
config: TenantConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
@@ -293,9 +287,11 @@ impl TenantConfigRequest {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct TenantAttachRequest {
|
||||
pub config: TenantAttachConfig,
|
||||
#[serde(default)]
|
||||
pub generation: Option<u32>,
|
||||
}
|
||||
|
||||
/// Newtype to enforce deny_unknown_fields on TenantConfig for
|
||||
|
||||
@@ -13,13 +13,14 @@ use std::{
|
||||
collections::HashMap,
|
||||
fmt::Debug,
|
||||
num::{NonZeroU32, NonZeroUsize},
|
||||
path::{Path, PathBuf},
|
||||
path::{Path, PathBuf, StripPrefixError},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io;
|
||||
use toml_edit::Item;
|
||||
use tracing::info;
|
||||
@@ -44,12 +45,34 @@ pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
|
||||
|
||||
const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
|
||||
|
||||
// From the S3 spec
|
||||
pub const MAX_KEYS_PER_DELETE: usize = 1000;
|
||||
|
||||
/// Path on the remote storage, relative to some inner prefix.
|
||||
/// The prefix is an implementation detail, that allows representing local paths
|
||||
/// as the remote ones, stripping the local storage prefix away.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct RemotePath(PathBuf);
|
||||
|
||||
impl Serialize for RemotePath {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
serializer.collect_str(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for RemotePath {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let str = String::deserialize(deserializer)?;
|
||||
Ok(Self(PathBuf::from(&str)))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for RemotePath {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0.display())
|
||||
@@ -88,6 +111,15 @@ impl RemotePath {
|
||||
pub fn extension(&self) -> Option<&str> {
|
||||
self.0.extension()?.to_str()
|
||||
}
|
||||
|
||||
/// Unwrap the PathBuf that RemotePath wraps
|
||||
pub fn take(self) -> PathBuf {
|
||||
self.0
|
||||
}
|
||||
|
||||
pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Path, StripPrefixError> {
|
||||
self.0.strip_prefix(&p.0)
|
||||
}
|
||||
}
|
||||
|
||||
/// Storage (potentially remote) API to manage its state.
|
||||
@@ -166,6 +198,8 @@ pub enum DownloadError {
|
||||
BadInput(anyhow::Error),
|
||||
/// The file was not found in the remote storage.
|
||||
NotFound,
|
||||
/// The client was shut down
|
||||
Shutdown,
|
||||
/// The file was found in the remote storage, but the download failed.
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
@@ -177,6 +211,7 @@ impl std::fmt::Display for DownloadError {
|
||||
write!(f, "Failed to download a remote file due to user input: {e}")
|
||||
}
|
||||
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
|
||||
DownloadError::Shutdown => write!(f, "Client shutting down"),
|
||||
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
|
||||
}
|
||||
}
|
||||
@@ -241,6 +276,18 @@ impl GenericRemoteStorage {
|
||||
}
|
||||
}
|
||||
|
||||
/// For small, simple downloads where caller doesn't want to handle the streaming: return the full body
|
||||
pub async fn download_all(&self, from: &RemotePath) -> Result<Vec<u8>, DownloadError> {
|
||||
let mut download = self.download(from).await?;
|
||||
|
||||
let mut bytes = Vec::new();
|
||||
tokio::io::copy(&mut download.download_stream, &mut bytes)
|
||||
.await
|
||||
.with_context(|| format!("Failed to download body from {from}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
Ok(bytes)
|
||||
}
|
||||
|
||||
pub async fn download_byte_range(
|
||||
&self,
|
||||
from: &RemotePath,
|
||||
|
||||
@@ -148,21 +148,53 @@ impl RemoteStorage for LocalFs {
|
||||
Some(folder) => folder.with_base(&self.storage_root),
|
||||
None => self.storage_root.clone(),
|
||||
};
|
||||
let mut files = vec![];
|
||||
let mut directory_queue = vec![full_path.clone()];
|
||||
|
||||
// If we were given a directory, we may use it as our starting point.
|
||||
// Otherwise, we must go up to the parent directory. This is because
|
||||
// S3 object list prefixes can be arbitrary strings, but when reading
|
||||
// the local filesystem we need a directory to start calling read_dir on.
|
||||
let mut initial_dir = full_path.clone();
|
||||
match fs::metadata(full_path.clone()).await {
|
||||
Err(e) => {
|
||||
// It's not a file that exists: strip the prefix back to the parent directory
|
||||
if matches!(e.kind(), ErrorKind::NotFound) {
|
||||
initial_dir.pop();
|
||||
}
|
||||
}
|
||||
Ok(meta) => {
|
||||
if !meta.is_dir() {
|
||||
// It's not a directory: strip back to the parent
|
||||
initial_dir.pop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Note that PathBuf starts_with only considers full path segments, but
|
||||
// object prefixes are arbitrary strings, so we need the strings for doing
|
||||
// starts_with later.
|
||||
let prefix = full_path.to_string_lossy();
|
||||
|
||||
let mut files = vec![];
|
||||
let mut directory_queue = vec![initial_dir.clone()];
|
||||
while let Some(cur_folder) = directory_queue.pop() {
|
||||
let mut entries = fs::read_dir(cur_folder.clone()).await?;
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let file_name: PathBuf = entry.file_name().into();
|
||||
let full_file_name = cur_folder.clone().join(&file_name);
|
||||
let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
|
||||
files.push(file_remote_path.clone());
|
||||
if full_file_name.is_dir() {
|
||||
directory_queue.push(full_file_name);
|
||||
if full_file_name
|
||||
.to_str()
|
||||
.map(|s| s.starts_with(prefix.as_ref()))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
|
||||
files.push(file_remote_path.clone());
|
||||
if full_file_name.is_dir() {
|
||||
directory_queue.push(full_file_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ use aws_sdk_s3::{
|
||||
Client,
|
||||
};
|
||||
use aws_smithy_http::body::SdkBody;
|
||||
use hyper::Body;
|
||||
use hyper::{Body, StatusCode};
|
||||
use scopeguard::ScopeGuard;
|
||||
use tokio::{
|
||||
io::{self, AsyncRead},
|
||||
@@ -529,7 +529,16 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(e.into());
|
||||
if let Some(r) = e.raw_response() {
|
||||
if r.http().status() == StatusCode::NOT_FOUND {
|
||||
// 404 is acceptable for deletions. AWS S3 does not return this, but
|
||||
// some other implementations might (e.g. GCS XML API returns 404 on DeleteObject
|
||||
// to a missing key)
|
||||
continue;
|
||||
} else {
|
||||
return Err(anyhow::format_err!("DeleteObjects response error: {e}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
121
libs/utils/src/generation.rs
Normal file
121
libs/utils/src/generation.rs
Normal file
@@ -0,0 +1,121 @@
|
||||
use std::fmt::Display;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
|
||||
pub enum Generation {
|
||||
// Generations with this magic value will not add a suffix to S3 keys, and will not
|
||||
// be included in persisted index_part.json. This value is only to be used
|
||||
// during migration from pre-generation metadata to generation-aware metadata,
|
||||
// and should eventually go away.
|
||||
//
|
||||
// A special Generation is used rather than always wrapping Generation in an Option,
|
||||
// so that code handling generations doesn't have to be aware of the legacy
|
||||
// case everywhere it touches a generation.
|
||||
None,
|
||||
// Generations with this magic value may never be used to construct S3 keys:
|
||||
// we will panic if someone tries to. This is for Tenants in the "Broken" state,
|
||||
// so that we can satisfy their constructor with a Generation without risking
|
||||
// a code bug using it in an S3 write (broken tenants should never write)
|
||||
Broken,
|
||||
Valid(u32),
|
||||
}
|
||||
|
||||
/// The Generation type represents a number associated with a Tenant, which
|
||||
/// increments every time the tenant is attached to a new pageserver, or
|
||||
/// an attached pageserver restarts.
|
||||
///
|
||||
/// It is included as a suffix in S3 keys, as a protection against split-brain
|
||||
/// scenarios where pageservers might otherwise issue conflicting writes to
|
||||
/// remote storage
|
||||
impl Generation {
|
||||
/// Create a new Generation that represents a legacy key format with
|
||||
/// no generation suffix
|
||||
pub fn none() -> Self {
|
||||
Self::None
|
||||
}
|
||||
|
||||
// Create a new generation that will panic if you try to use get_suffix
|
||||
pub fn broken() -> Self {
|
||||
Self::Broken
|
||||
}
|
||||
|
||||
pub fn new(v: u32) -> Self {
|
||||
Self::Valid(v)
|
||||
}
|
||||
|
||||
pub fn is_none(&self) -> bool {
|
||||
matches!(self, Self::None)
|
||||
}
|
||||
|
||||
pub fn get_suffix(&self) -> String {
|
||||
match self {
|
||||
Self::Valid(v) => {
|
||||
format!("-{:08x}", v)
|
||||
}
|
||||
Self::None => "".into(),
|
||||
Self::Broken => {
|
||||
panic!("Tried to use a broken generation");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn previous(&self) -> Self {
|
||||
if let Self::Valid(v) = self {
|
||||
Self::new(v - 1)
|
||||
} else {
|
||||
Self::none()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into(self) -> Option<u32> {
|
||||
if let Self::Valid(v) = self {
|
||||
Some(v)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for Generation {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
if let Self::Valid(v) = self {
|
||||
v.serialize(serializer)
|
||||
} else {
|
||||
// We should never be asked to serialize a None or Broken. Structures
|
||||
// that include an optional generation should convert None to an
|
||||
// Option<Generation>::None
|
||||
Err(serde::ser::Error::custom(
|
||||
"Tried to serialize invalid generation",
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for Generation {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
Ok(Self::Valid(u32::deserialize(deserializer)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Generation {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Valid(v) => {
|
||||
write!(f, "{:08x}", v)
|
||||
}
|
||||
Self::None => {
|
||||
write!(f, "<none>")
|
||||
}
|
||||
Self::Broken => {
|
||||
write!(f, "<broken>")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -24,6 +24,9 @@ pub enum ApiError {
|
||||
#[error("Precondition failed: {0}")]
|
||||
PreconditionFailed(Box<str>),
|
||||
|
||||
#[error("Shutting down")]
|
||||
ShuttingDown,
|
||||
|
||||
#[error(transparent)]
|
||||
InternalServerError(anyhow::Error),
|
||||
}
|
||||
@@ -52,6 +55,10 @@ impl ApiError {
|
||||
self.to_string(),
|
||||
StatusCode::PRECONDITION_FAILED,
|
||||
),
|
||||
ApiError::ShuttingDown => HttpErrorBody::response_from_msg_and_status(
|
||||
"Shutting down".to_string(),
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
),
|
||||
ApiError::InternalServerError(err) => HttpErrorBody::response_from_msg_and_status(
|
||||
err.to_string(),
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
|
||||
@@ -50,7 +50,7 @@ impl Id {
|
||||
Id::from(tli_buf)
|
||||
}
|
||||
|
||||
fn hex_encode(&self) -> String {
|
||||
pub fn hex_encode(&self) -> String {
|
||||
static HEX: &[u8] = b"0123456789abcdef";
|
||||
|
||||
let mut buf = vec![0u8; self.0.len() * 2];
|
||||
@@ -133,6 +133,10 @@ macro_rules! id_newtype {
|
||||
pub const fn from_array(b: [u8; 16]) -> Self {
|
||||
$t(Id(b))
|
||||
}
|
||||
|
||||
pub fn hex_encode(&self) -> String {
|
||||
self.0.hex_encode()
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for $t {
|
||||
@@ -244,13 +248,13 @@ id_newtype!(TenantId);
|
||||
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
|
||||
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
|
||||
/// See [`Id`] for alternative ways to serialize it.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
|
||||
pub struct ConnectionId(Id);
|
||||
|
||||
id_newtype!(ConnectionId);
|
||||
|
||||
// A pair uniquely identifying Neon instance.
|
||||
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
|
||||
pub struct TenantTimelineId {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
@@ -273,6 +277,36 @@ impl TenantTimelineId {
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for TenantTimelineId {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
serializer.collect_str(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for TenantTimelineId {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let str = String::deserialize(deserializer)?;
|
||||
if let Some((tenant_part, timeline_part)) = str.split_once('/') {
|
||||
Ok(Self {
|
||||
tenant_id: TenantId(Id::from_hex(tenant_part).map_err(|e| {
|
||||
serde::de::Error::custom(format!("Malformed tenant in TenantTimelineId: {e}"))
|
||||
})?),
|
||||
timeline_id: TimelineId(Id::from_hex(timeline_part).map_err(|e| {
|
||||
serde::de::Error::custom(format!("Malformed timeline in TenantTimelineId {e}"))
|
||||
})?),
|
||||
})
|
||||
} else {
|
||||
Err(serde::de::Error::custom("Malformed TenantTimelineId"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for TenantTimelineId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}/{}", self.tenant_id, self.timeline_id)
|
||||
|
||||
@@ -27,6 +27,9 @@ pub mod id;
|
||||
// http endpoint utils
|
||||
pub mod http;
|
||||
|
||||
// definition of the Generation type for pageserver attachment APIs
|
||||
pub mod generation;
|
||||
|
||||
// common log initialisation routine
|
||||
pub mod logging;
|
||||
|
||||
|
||||
@@ -2,12 +2,14 @@
|
||||
|
||||
use std::env::{var, VarError};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{env, ops::ControlFlow, path::Path, str::FromStr};
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use clap::{Arg, ArgAction, Command};
|
||||
|
||||
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
|
||||
use pageserver::deletion_queue::{DeletionQueue, DeletionQueueError};
|
||||
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
|
||||
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
|
||||
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
|
||||
@@ -349,6 +351,35 @@ fn start_pageserver(
|
||||
// Set up remote storage client
|
||||
let remote_storage = create_remote_storage_client(conf)?;
|
||||
|
||||
// Set up deletion queue
|
||||
let deletion_queue_cancel = tokio_util::sync::CancellationToken::new();
|
||||
let (deletion_queue, deletion_frontend, deletion_backend, deletion_executor) =
|
||||
DeletionQueue::new(remote_storage.clone(), conf, deletion_queue_cancel.clone());
|
||||
if let Some(mut deletion_frontend) = deletion_frontend {
|
||||
BACKGROUND_RUNTIME.spawn(async move {
|
||||
deletion_frontend
|
||||
.background()
|
||||
.instrument(info_span!(parent:None, "deletion frontend"))
|
||||
.await
|
||||
});
|
||||
}
|
||||
if let Some(mut deletion_backend) = deletion_backend {
|
||||
BACKGROUND_RUNTIME.spawn(async move {
|
||||
deletion_backend
|
||||
.background()
|
||||
.instrument(info_span!(parent: None, "deletion backend"))
|
||||
.await
|
||||
});
|
||||
}
|
||||
if let Some(mut deletion_executor) = deletion_executor {
|
||||
BACKGROUND_RUNTIME.spawn(async move {
|
||||
deletion_executor
|
||||
.background()
|
||||
.instrument(info_span!(parent: None, "deletion executor"))
|
||||
.await
|
||||
});
|
||||
}
|
||||
|
||||
// Up to this point no significant I/O has been done: this should have been fast. Record
|
||||
// duration prior to starting I/O intensive phase of startup.
|
||||
startup_checkpoint("initial", "Starting loading tenants");
|
||||
@@ -386,6 +417,7 @@ fn start_pageserver(
|
||||
TenantSharedResources {
|
||||
broker_client: broker_client.clone(),
|
||||
remote_storage: remote_storage.clone(),
|
||||
deletion_queue_client: deletion_queue.new_client(),
|
||||
},
|
||||
order,
|
||||
))?;
|
||||
@@ -482,6 +514,7 @@ fn start_pageserver(
|
||||
http_auth,
|
||||
broker_client.clone(),
|
||||
remote_storage,
|
||||
deletion_queue.clone(),
|
||||
disk_usage_eviction_state,
|
||||
)?
|
||||
.build()
|
||||
@@ -604,6 +637,36 @@ fn start_pageserver(
|
||||
// The plan is to change that over time.
|
||||
shutdown_pageserver.take();
|
||||
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(0));
|
||||
|
||||
// Best effort to persist any outstanding deletions, to avoid leaking objects
|
||||
let dq = deletion_queue.clone();
|
||||
BACKGROUND_RUNTIME.block_on(async move {
|
||||
match tokio::time::timeout(Duration::from_secs(5), dq.new_client().flush()).await {
|
||||
Ok(flush_r) => {
|
||||
match flush_r {
|
||||
Ok(()) => {
|
||||
info!("Deletion queue flushed successfully on shutdown")
|
||||
}
|
||||
Err(e) => {
|
||||
match e {
|
||||
DeletionQueueError::ShuttingDown => {
|
||||
// This is not harmful for correctness, but is unexpected: the deletion
|
||||
// queue's workers should stay alive as long as there are any client handles instantiated.
|
||||
warn!("Deletion queue stopped prematurely");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Timed out flushing deletion queue on shutdown ({e})")
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Clean shutdown of deletion queue workers
|
||||
deletion_queue_cancel.cancel();
|
||||
|
||||
unreachable!()
|
||||
}
|
||||
})
|
||||
|
||||
@@ -204,6 +204,8 @@ pub struct PageServerConf {
|
||||
/// has it's initial logical size calculated. Not running background tasks for some seconds is
|
||||
/// not terrible.
|
||||
pub background_task_maximum_delay: Duration,
|
||||
|
||||
pub control_plane_api: Option<Url>,
|
||||
}
|
||||
|
||||
/// We do not want to store this in a PageServerConf because the latter may be logged
|
||||
@@ -278,6 +280,8 @@ struct PageServerConfigBuilder {
|
||||
ondemand_download_behavior_treat_error_as_warn: BuilderValue<bool>,
|
||||
|
||||
background_task_maximum_delay: BuilderValue<Duration>,
|
||||
|
||||
control_plane_api: BuilderValue<Option<Url>>,
|
||||
}
|
||||
|
||||
impl Default for PageServerConfigBuilder {
|
||||
@@ -340,6 +344,8 @@ impl Default for PageServerConfigBuilder {
|
||||
DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY,
|
||||
)
|
||||
.unwrap()),
|
||||
|
||||
control_plane_api: Set(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -468,6 +474,10 @@ impl PageServerConfigBuilder {
|
||||
self.background_task_maximum_delay = BuilderValue::Set(delay);
|
||||
}
|
||||
|
||||
pub fn control_plane_api(&mut self, api: Url) {
|
||||
self.control_plane_api = BuilderValue::Set(Some(api))
|
||||
}
|
||||
|
||||
pub fn build(self) -> anyhow::Result<PageServerConf> {
|
||||
let concurrent_tenant_size_logical_size_queries = self
|
||||
.concurrent_tenant_size_logical_size_queries
|
||||
@@ -553,6 +563,9 @@ impl PageServerConfigBuilder {
|
||||
background_task_maximum_delay: self
|
||||
.background_task_maximum_delay
|
||||
.ok_or(anyhow!("missing background_task_maximum_delay"))?,
|
||||
control_plane_api: self
|
||||
.control_plane_api
|
||||
.ok_or(anyhow!("missing control_plane_api"))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -566,6 +579,27 @@ impl PageServerConf {
|
||||
self.workdir.join("tenants")
|
||||
}
|
||||
|
||||
pub fn deletion_prefix(&self) -> PathBuf {
|
||||
self.workdir.join("deletion")
|
||||
}
|
||||
|
||||
pub fn deletion_list_path(&self, sequence: u64) -> PathBuf {
|
||||
// Encode a version in the filename, so that if we ever switch away from JSON we can
|
||||
// increment this.
|
||||
const VERSION: u8 = 1;
|
||||
|
||||
self.deletion_prefix()
|
||||
.join(format!("{sequence:016x}-{VERSION:02x}.list"))
|
||||
}
|
||||
|
||||
pub fn deletion_header_path(&self) -> PathBuf {
|
||||
// Encode a version in the filename, so that if we ever switch away from JSON we can
|
||||
// increment this.
|
||||
const VERSION: u8 = 1;
|
||||
|
||||
self.deletion_prefix().join(format!("header-{VERSION:02x}"))
|
||||
}
|
||||
|
||||
pub fn tenant_path(&self, tenant_id: &TenantId) -> PathBuf {
|
||||
self.tenants_path().join(tenant_id.to_string())
|
||||
}
|
||||
@@ -643,23 +677,6 @@ impl PageServerConf {
|
||||
.join(METADATA_FILE_NAME)
|
||||
}
|
||||
|
||||
/// Files on the remote storage are stored with paths, relative to the workdir.
|
||||
/// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
|
||||
///
|
||||
/// Errors if the path provided does not start from pageserver's workdir.
|
||||
pub fn remote_path(&self, local_path: &Path) -> anyhow::Result<RemotePath> {
|
||||
local_path
|
||||
.strip_prefix(&self.workdir)
|
||||
.context("Failed to strip workdir prefix")
|
||||
.and_then(RemotePath::new)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to resolve remote part of path {:?} for base {:?}",
|
||||
local_path, self.workdir
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/// Turns storage remote path of a file into its local path.
|
||||
pub fn local_path(&self, remote_path: &RemotePath) -> PathBuf {
|
||||
remote_path.with_base(&self.workdir)
|
||||
@@ -758,6 +775,7 @@ impl PageServerConf {
|
||||
},
|
||||
"ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
|
||||
"background_task_maximum_delay" => builder.background_task_maximum_delay(parse_toml_duration(key, item)?),
|
||||
"control_plane_api" => builder.control_plane_api(parse_toml_string(key, item)?.parse().context("failed to parse control plane URL")?),
|
||||
_ => bail!("unrecognized pageserver option '{key}'"),
|
||||
}
|
||||
}
|
||||
@@ -926,6 +944,7 @@ impl PageServerConf {
|
||||
test_remote_failures: 0,
|
||||
ondemand_download_behavior_treat_error_as_warn: false,
|
||||
background_task_maximum_delay: Duration::ZERO,
|
||||
control_plane_api: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1149,6 +1168,7 @@ background_task_maximum_delay = '334 s'
|
||||
background_task_maximum_delay: humantime::parse_duration(
|
||||
defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY
|
||||
)?,
|
||||
control_plane_api: None
|
||||
},
|
||||
"Correct defaults should be used when no config values are provided"
|
||||
);
|
||||
@@ -1204,6 +1224,7 @@ background_task_maximum_delay = '334 s'
|
||||
test_remote_failures: 0,
|
||||
ondemand_download_behavior_treat_error_as_warn: false,
|
||||
background_task_maximum_delay: Duration::from_secs(334),
|
||||
control_plane_api: None
|
||||
},
|
||||
"Should be able to parse all basic config values correctly"
|
||||
);
|
||||
|
||||
850
pageserver/src/deletion_queue.rs
Normal file
850
pageserver/src/deletion_queue.rs
Normal file
@@ -0,0 +1,850 @@
|
||||
mod backend;
|
||||
mod executor;
|
||||
mod frontend;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use crate::metrics::DELETION_QUEUE_SUBMITTED;
|
||||
use crate::tenant::remote_timeline_client::remote_timeline_path;
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_with::serde_as;
|
||||
use thiserror::Error;
|
||||
use tokio;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{self, debug, error};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
pub(crate) use self::backend::BackendQueueWorker;
|
||||
use self::executor::ExecutorWorker;
|
||||
use self::frontend::DeletionOp;
|
||||
pub(crate) use self::frontend::FrontendQueueWorker;
|
||||
use backend::BackendQueueMessage;
|
||||
use executor::ExecutorMessage;
|
||||
use frontend::FrontendQueueMessage;
|
||||
|
||||
use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName};
|
||||
|
||||
// TODO: adminstrative "panic button" config property to disable all deletions
|
||||
// TODO: configurable for how long to wait before executing deletions
|
||||
|
||||
/// We aggregate object deletions from many tenants in one place, for several reasons:
|
||||
/// - Coalesce deletions into fewer DeleteObjects calls
|
||||
/// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes
|
||||
/// to flush any outstanding deletions.
|
||||
/// - Globally control throughput of deletions, as these are a low priority task: do
|
||||
/// not compete with the same S3 clients/connections used for higher priority uploads.
|
||||
/// - Future: enable validating that we may do deletions in a multi-attached scenario,
|
||||
/// via generation numbers (see https://github.com/neondatabase/neon/pull/4919)
|
||||
///
|
||||
/// There are two kinds of deletion: deferred and immediate. A deferred deletion
|
||||
/// may be intentionally delayed to protect passive readers of S3 data, and may
|
||||
/// be subject to a generation number validation step. An immediate deletion is
|
||||
/// ready to execute immediately, and is only queued up so that it can be coalesced
|
||||
/// with other deletions in flight.
|
||||
///
|
||||
/// Deferred deletions pass through three steps:
|
||||
/// - Frontend: accumulate deletion requests from Timelines, and batch them up into
|
||||
/// DeletionLists, which are persisted to S3.
|
||||
/// - Backend: accumulate deletion lists, and validate them en-masse prior to passing
|
||||
/// the keys in the list onward for actual deletion
|
||||
/// - Executor: accumulate object keys that the backend has validated for immediate
|
||||
/// deletion, and execute them in batches of 1000 keys via DeleteObjects.
|
||||
///
|
||||
/// Non-deferred deletions, such as during timeline deletion, bypass the first
|
||||
/// two stages and are passed straight into the Executor.
|
||||
///
|
||||
/// Internally, each stage is joined by a channel to the next. In S3, there is only
|
||||
/// one queue (of DeletionLists), which is written by the frontend and consumed
|
||||
/// by the backend.
|
||||
#[derive(Clone)]
|
||||
pub struct DeletionQueue {
|
||||
client: DeletionQueueClient,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct FlushOp {
|
||||
tx: tokio::sync::oneshot::Sender<()>,
|
||||
}
|
||||
|
||||
impl FlushOp {
|
||||
fn fire(self) {
|
||||
if self.tx.send(()).is_err() {
|
||||
// oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.
|
||||
debug!("deletion queue flush from dropped client");
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DeletionQueueClient {
|
||||
tx: tokio::sync::mpsc::Sender<FrontendQueueMessage>,
|
||||
executor_tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct TenantDeletionList {
|
||||
/// For each Timeline, a list of key fragments to append to the timeline remote path
|
||||
/// when reconstructing a full key
|
||||
timelines: HashMap<TimelineId, Vec<String>>,
|
||||
|
||||
/// The generation in which this deletion was emitted: note that this may not be the
|
||||
/// same as the generation of any layers being deleted. The generation of the layer
|
||||
/// has already been absorbed into the keys in `objects`
|
||||
generation: Generation,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct DeletionList {
|
||||
/// Serialization version, for future use
|
||||
version: u8,
|
||||
|
||||
/// Used for constructing a unique key for each deletion list we write out.
|
||||
sequence: u64,
|
||||
|
||||
/// To avoid repeating tenant/timeline IDs in every key, we store keys in
|
||||
/// nested HashMaps by TenantTimelineID. Each Tenant only appears once
|
||||
/// with one unique generation ID: if someone tries to push a second generation
|
||||
/// ID for the same tenant, we will start a new DeletionList.
|
||||
tenants: HashMap<TenantId, TenantDeletionList>,
|
||||
|
||||
/// Avoid having to walk `tenants` to calculate size
|
||||
size: usize,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct DeletionHeader {
|
||||
/// Serialization version, for future use
|
||||
version: u8,
|
||||
|
||||
/// Enable determining the next sequence number even if there are no deletion lists present.
|
||||
/// If there _are_ deletion lists present, then their sequence numbers take precedence over
|
||||
/// this.
|
||||
last_deleted_list_seq: u64,
|
||||
// TODO: this is where we will track a 'clean' sequence number that indicates all deletion
|
||||
// lists <= that sequence have had their generations validated with the control plane
|
||||
// and are OK to execute.
|
||||
}
|
||||
|
||||
impl DeletionHeader {
|
||||
const VERSION_LATEST: u8 = 1;
|
||||
|
||||
fn new(last_deleted_list_seq: u64) -> Self {
|
||||
Self {
|
||||
version: Self::VERSION_LATEST,
|
||||
last_deleted_list_seq,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DeletionList {
|
||||
const VERSION_LATEST: u8 = 1;
|
||||
fn new(sequence: u64) -> Self {
|
||||
Self {
|
||||
version: Self::VERSION_LATEST,
|
||||
sequence,
|
||||
tenants: HashMap::new(),
|
||||
size: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn drain(&mut self) -> Self {
|
||||
let mut tenants = HashMap::new();
|
||||
std::mem::swap(&mut self.tenants, &mut tenants);
|
||||
let other = Self {
|
||||
version: Self::VERSION_LATEST,
|
||||
sequence: self.sequence,
|
||||
tenants,
|
||||
size: self.size,
|
||||
};
|
||||
self.size = 0;
|
||||
other
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
self.tenants.is_empty()
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
self.size
|
||||
}
|
||||
|
||||
/// Returns true if the push was accepted, false if the caller must start a new
|
||||
/// deletion list.
|
||||
fn push(
|
||||
&mut self,
|
||||
tenant: &TenantId,
|
||||
timeline: &TimelineId,
|
||||
generation: Generation,
|
||||
objects: &mut Vec<RemotePath>,
|
||||
) -> bool {
|
||||
if objects.is_empty() {
|
||||
// Avoid inserting an empty TimelineDeletionList: this preserves the property
|
||||
// that if we have no keys, then self.objects is empty (used in Self::is_empty)
|
||||
return true;
|
||||
}
|
||||
|
||||
let tenant_entry = self
|
||||
.tenants
|
||||
.entry(*tenant)
|
||||
.or_insert_with(|| TenantDeletionList {
|
||||
timelines: HashMap::new(),
|
||||
generation: generation,
|
||||
});
|
||||
|
||||
if tenant_entry.generation != generation {
|
||||
// Only one generation per tenant per list: signal to
|
||||
// caller to start a new list.
|
||||
return false;
|
||||
}
|
||||
|
||||
let timeline_entry = tenant_entry
|
||||
.timelines
|
||||
.entry(*timeline)
|
||||
.or_insert_with(|| Vec::new());
|
||||
|
||||
let timeline_remote_path = remote_timeline_path(tenant, timeline);
|
||||
|
||||
self.size += objects.len();
|
||||
timeline_entry.extend(objects.drain(..).map(|p| {
|
||||
p.strip_prefix(&timeline_remote_path)
|
||||
.expect("Timeline paths always start with the timeline prefix")
|
||||
.to_string_lossy()
|
||||
.to_string()
|
||||
}));
|
||||
true
|
||||
}
|
||||
|
||||
fn take_paths(self) -> Vec<RemotePath> {
|
||||
let mut result = Vec::new();
|
||||
for (tenant, tenant_deletions) in self.tenants.into_iter() {
|
||||
for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() {
|
||||
let timeline_remote_path = remote_timeline_path(&tenant, &timeline);
|
||||
result.extend(
|
||||
timeline_layers
|
||||
.into_iter()
|
||||
.map(|l| timeline_remote_path.join(&PathBuf::from(l))),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum DeletionQueueError {
|
||||
#[error("Deletion queue unavailable during shutdown")]
|
||||
ShuttingDown,
|
||||
}
|
||||
|
||||
impl DeletionQueueClient {
|
||||
async fn do_push(&self, msg: FrontendQueueMessage) -> Result<(), DeletionQueueError> {
|
||||
match self.tx.send(msg).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
// This shouldn't happen, we should shut down all tenants before
|
||||
// we shut down the global delete queue. If we encounter a bug like this,
|
||||
// we may leak objects as deletions won't be processed.
|
||||
error!("Deletion queue closed while pushing, shutting down? ({e})");
|
||||
Err(DeletionQueueError::ShuttingDown)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Submit a list of layers for deletion: this function will return before the deletion is
|
||||
/// persistent, but it may be executed at any time after this function enters: do not push
|
||||
/// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
|
||||
/// references them).
|
||||
pub(crate) async fn push_layers(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
generation: Generation,
|
||||
layers: Vec<(LayerFileName, Generation)>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
DELETION_QUEUE_SUBMITTED.inc_by(layers.len() as u64);
|
||||
self.do_push(FrontendQueueMessage::Delete(DeletionOp {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
layers,
|
||||
generation,
|
||||
objects: Vec::new(),
|
||||
}))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn do_flush(
|
||||
&self,
|
||||
msg: FrontendQueueMessage,
|
||||
rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
self.do_push(msg).await?;
|
||||
if rx.await.is_err() {
|
||||
// This shouldn't happen if tenants are shut down before deletion queue. If we
|
||||
// encounter a bug like this, then a flusher will incorrectly believe it has flushed
|
||||
// when it hasn't, possibly leading to leaking objects.
|
||||
error!("Deletion queue dropped flush op while client was still waiting");
|
||||
Err(DeletionQueueError::ShuttingDown)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
|
||||
pub async fn flush(&self) -> Result<(), DeletionQueueError> {
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
|
||||
self.do_flush(FrontendQueueMessage::Flush(FlushOp { tx }), rx)
|
||||
.await
|
||||
}
|
||||
|
||||
// Wait until all previous deletions are executed
|
||||
pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
|
||||
debug!("flush_execute: flushing to deletion lists...");
|
||||
// Flush any buffered work to deletion lists
|
||||
self.flush().await?;
|
||||
|
||||
// Flush execution of deletion lists
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
|
||||
debug!("flush_execute: flushing execution...");
|
||||
self.do_flush(FrontendQueueMessage::FlushExecute(FlushOp { tx }), rx)
|
||||
.await?;
|
||||
debug!("flush_execute: finished flushing execution...");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This interface bypasses the persistent deletion queue, and any validation
|
||||
/// that this pageserver is still elegible to execute the deletions. It is for
|
||||
/// use in timeline deletions, where the control plane is telling us we may
|
||||
/// delete everything in the timeline.
|
||||
///
|
||||
/// DO NOT USE THIS FROM GC OR COMPACTION CODE. Use the regular `push_layers`.
|
||||
pub(crate) async fn push_immediate(
|
||||
&self,
|
||||
objects: Vec<RemotePath>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
self.executor_tx
|
||||
.send(ExecutorMessage::Delete(objects))
|
||||
.await
|
||||
.map_err(|_| DeletionQueueError::ShuttingDown)
|
||||
}
|
||||
|
||||
/// Companion to push_immediate. When this returns Ok, all prior objects sent
|
||||
/// into push_immediate have been deleted from remote storage.
|
||||
pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> {
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
|
||||
self.executor_tx
|
||||
.send(ExecutorMessage::Flush(FlushOp { tx }))
|
||||
.await
|
||||
.map_err(|_| DeletionQueueError::ShuttingDown)?;
|
||||
|
||||
rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
|
||||
}
|
||||
}
|
||||
|
||||
impl DeletionQueue {
|
||||
pub fn new_client(&self) -> DeletionQueueClient {
|
||||
self.client.clone()
|
||||
}
|
||||
|
||||
/// Caller may use the returned object to construct clients with new_client.
|
||||
/// Caller should tokio::spawn the background() members of the two worker objects returned:
|
||||
/// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
|
||||
///
|
||||
/// If remote_storage is None, then the returned workers will also be None.
|
||||
pub fn new(
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
conf: &'static PageServerConf,
|
||||
cancel: CancellationToken,
|
||||
) -> (
|
||||
Self,
|
||||
Option<FrontendQueueWorker>,
|
||||
Option<BackendQueueWorker>,
|
||||
Option<ExecutorWorker>,
|
||||
) {
|
||||
// Deep channel: it consumes deletions from all timelines and we do not want to block them
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(16384);
|
||||
|
||||
// Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
|
||||
let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
|
||||
|
||||
// Shallow channel: it carries lists of paths, and we expect the main queueing to
|
||||
// happen in the backend (persistent), not in this queue.
|
||||
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
|
||||
|
||||
let remote_storage = match remote_storage {
|
||||
None => {
|
||||
return (
|
||||
Self {
|
||||
client: DeletionQueueClient { tx, executor_tx },
|
||||
},
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
}
|
||||
Some(r) => r,
|
||||
};
|
||||
|
||||
(
|
||||
Self {
|
||||
client: DeletionQueueClient {
|
||||
tx,
|
||||
executor_tx: executor_tx.clone(),
|
||||
},
|
||||
},
|
||||
Some(FrontendQueueWorker::new(
|
||||
conf,
|
||||
rx,
|
||||
backend_tx,
|
||||
cancel.clone(),
|
||||
)),
|
||||
Some(BackendQueueWorker::new(
|
||||
conf,
|
||||
backend_rx,
|
||||
executor_tx,
|
||||
cancel.clone(),
|
||||
)),
|
||||
Some(ExecutorWorker::new(
|
||||
remote_storage,
|
||||
executor_rx,
|
||||
cancel.clone(),
|
||||
)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use hex_literal::hex;
|
||||
use std::{
|
||||
io::ErrorKind,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use tracing::info;
|
||||
|
||||
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
|
||||
use tokio::{runtime::EnterGuard, task::JoinHandle};
|
||||
|
||||
use crate::tenant::{harness::TenantHarness, remote_timeline_client::remote_timeline_path};
|
||||
|
||||
use super::*;
|
||||
pub const TIMELINE_ID: TimelineId =
|
||||
TimelineId::from_array(hex!("11223344556677881122334455667788"));
|
||||
|
||||
struct TestSetup {
|
||||
runtime: &'static tokio::runtime::Runtime,
|
||||
_entered_runtime: EnterGuard<'static>,
|
||||
harness: TenantHarness,
|
||||
remote_fs_dir: PathBuf,
|
||||
storage: GenericRemoteStorage,
|
||||
deletion_queue: DeletionQueue,
|
||||
fe_worker: JoinHandle<()>,
|
||||
be_worker: JoinHandle<()>,
|
||||
ex_worker: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl TestSetup {
|
||||
/// Simulate a pageserver restart by destroying and recreating the deletion queue
|
||||
fn restart(&mut self) {
|
||||
let (deletion_queue, fe_worker, be_worker, ex_worker) = DeletionQueue::new(
|
||||
Some(self.storage.clone()),
|
||||
self.harness.conf,
|
||||
CancellationToken::new(),
|
||||
);
|
||||
|
||||
self.deletion_queue = deletion_queue;
|
||||
|
||||
let mut fe_worker = fe_worker.unwrap();
|
||||
let mut be_worker = be_worker.unwrap();
|
||||
let mut ex_worker = ex_worker.unwrap();
|
||||
let mut fe_worker = self
|
||||
.runtime
|
||||
.spawn(async move { fe_worker.background().await });
|
||||
let mut be_worker = self
|
||||
.runtime
|
||||
.spawn(async move { be_worker.background().await });
|
||||
let mut ex_worker = self.runtime.spawn(async move {
|
||||
drop(ex_worker.background().await);
|
||||
});
|
||||
std::mem::swap(&mut self.fe_worker, &mut fe_worker);
|
||||
std::mem::swap(&mut self.be_worker, &mut be_worker);
|
||||
std::mem::swap(&mut self.ex_worker, &mut ex_worker);
|
||||
|
||||
// Join the old workers
|
||||
self.runtime.block_on(fe_worker).unwrap();
|
||||
self.runtime.block_on(be_worker).unwrap();
|
||||
self.runtime.block_on(ex_worker).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
|
||||
let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
|
||||
// We do not load() the harness: we only need its config and remote_storage
|
||||
|
||||
// Set up a GenericRemoteStorage targetting a directory
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
|
||||
let storage_config = RemoteStorageConfig {
|
||||
max_concurrent_syncs: std::num::NonZeroUsize::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
|
||||
)
|
||||
.unwrap(),
|
||||
max_sync_errors: std::num::NonZeroU32::new(
|
||||
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
|
||||
)
|
||||
.unwrap(),
|
||||
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
||||
};
|
||||
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
|
||||
|
||||
let runtime = Box::leak(Box::new(
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?,
|
||||
));
|
||||
let entered_runtime = runtime.enter();
|
||||
|
||||
let (deletion_queue, fe_worker, be_worker, ex_worker) = DeletionQueue::new(
|
||||
Some(storage.clone()),
|
||||
harness.conf,
|
||||
CancellationToken::new(),
|
||||
);
|
||||
|
||||
let mut fe_worker = fe_worker.unwrap();
|
||||
let mut be_worker = be_worker.unwrap();
|
||||
let mut ex_worker = ex_worker.unwrap();
|
||||
let fe_worker_join = runtime.spawn(async move { fe_worker.background().await });
|
||||
let be_worker_join = runtime.spawn(async move { be_worker.background().await });
|
||||
let ex_worker_join = runtime.spawn(async move {
|
||||
drop(ex_worker.background().await);
|
||||
});
|
||||
|
||||
Ok(TestSetup {
|
||||
runtime,
|
||||
_entered_runtime: entered_runtime,
|
||||
harness,
|
||||
remote_fs_dir,
|
||||
storage,
|
||||
deletion_queue,
|
||||
fe_worker: fe_worker_join,
|
||||
be_worker: be_worker_join,
|
||||
ex_worker: ex_worker_join,
|
||||
})
|
||||
}
|
||||
|
||||
// TODO: put this in a common location so that we can share with remote_timeline_client's tests
|
||||
fn assert_remote_files(expected: &[&str], remote_path: &Path) {
|
||||
let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
|
||||
expected.sort();
|
||||
|
||||
let mut found: Vec<String> = Vec::new();
|
||||
let dir = match std::fs::read_dir(remote_path) {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
if e.kind() == ErrorKind::NotFound {
|
||||
if expected.is_empty() {
|
||||
// We are asserting prefix is empty: it is expected that the dir is missing
|
||||
return;
|
||||
} else {
|
||||
assert_eq!(expected, Vec::<String>::new());
|
||||
unreachable!();
|
||||
}
|
||||
} else {
|
||||
panic!(
|
||||
"Unexpected error listing {0}: {e}",
|
||||
remote_path.to_string_lossy()
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
for entry in dir.flatten() {
|
||||
let entry_name = entry.file_name();
|
||||
let fname = entry_name.to_str().unwrap();
|
||||
found.push(String::from(fname));
|
||||
}
|
||||
found.sort();
|
||||
|
||||
assert_eq!(expected, found);
|
||||
}
|
||||
|
||||
fn assert_local_files(expected: &[&str], directory: &Path) {
|
||||
let mut dir = match std::fs::read_dir(directory) {
|
||||
Ok(d) => d,
|
||||
Err(_) => {
|
||||
assert_eq!(expected, &Vec::<String>::new());
|
||||
return;
|
||||
}
|
||||
};
|
||||
let mut found = Vec::new();
|
||||
while let Some(dentry) = dir.next() {
|
||||
let dentry = dentry.unwrap();
|
||||
let file_name = dentry.file_name();
|
||||
let file_name_str = file_name.to_string_lossy();
|
||||
found.push(file_name_str.to_string());
|
||||
}
|
||||
found.sort();
|
||||
assert_eq!(expected, found);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deletion_queue_smoke() -> anyhow::Result<()> {
|
||||
// Basic test that the deletion queue processes the deletions we pass into it
|
||||
let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
|
||||
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
|
||||
let tenant_id = ctx.harness.tenant_id;
|
||||
|
||||
let content: Vec<u8> = "victim1 contents".into();
|
||||
let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
|
||||
let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
|
||||
let deletion_prefix = ctx.harness.conf.deletion_prefix();
|
||||
|
||||
// Exercise the distinction between the generation of the layers
|
||||
// we delete, and the generation of the running Tenant.
|
||||
let layer_generation = Generation::new(0xdeadbeef);
|
||||
let now_generation = Generation::new(0xfeedbeef);
|
||||
|
||||
let remote_layer_file_name_1 =
|
||||
format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
|
||||
|
||||
// Inject a victim file to remote storage
|
||||
info!("Writing");
|
||||
std::fs::create_dir_all(&remote_timeline_path)?;
|
||||
std::fs::write(
|
||||
remote_timeline_path.join(remote_layer_file_name_1.clone()),
|
||||
content,
|
||||
)?;
|
||||
assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
|
||||
|
||||
// File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
|
||||
info!("Pushing");
|
||||
ctx.runtime.block_on(client.push_layers(
|
||||
tenant_id,
|
||||
TIMELINE_ID,
|
||||
now_generation,
|
||||
[(layer_file_name_1.clone(), layer_generation)].to_vec(),
|
||||
))?;
|
||||
assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
|
||||
|
||||
assert_local_files(&[], &deletion_prefix);
|
||||
|
||||
// File should still be there after we write a deletion list (we haven't pushed enough to execute anything)
|
||||
info!("Flushing");
|
||||
ctx.runtime.block_on(client.flush())?;
|
||||
assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
|
||||
assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);
|
||||
|
||||
// File should go away when we execute
|
||||
info!("Flush-executing");
|
||||
ctx.runtime.block_on(client.flush_execute())?;
|
||||
assert_remote_files(&[], &remote_timeline_path);
|
||||
assert_local_files(&["header-01"], &deletion_prefix);
|
||||
|
||||
// Flushing on an empty queue should succeed immediately, and not write any lists
|
||||
info!("Flush-executing on empty");
|
||||
ctx.runtime.block_on(client.flush_execute())?;
|
||||
assert_local_files(&["header-01"], &deletion_prefix);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deletion_queue_recovery() -> anyhow::Result<()> {
|
||||
// Basic test that the deletion queue processes the deletions we pass into it
|
||||
let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
|
||||
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
|
||||
let tenant_id = ctx.harness.tenant_id;
|
||||
|
||||
let content: Vec<u8> = "victim1 contents".into();
|
||||
let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
|
||||
let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
|
||||
let deletion_prefix = ctx.harness.conf.deletion_prefix();
|
||||
let layer_generation = Generation::new(0xdeadbeef);
|
||||
let now_generation = Generation::new(0xfeedbeef);
|
||||
let remote_layer_file_name_1 =
|
||||
format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
|
||||
|
||||
// Inject a file, delete it, and flush to a deletion list
|
||||
std::fs::create_dir_all(&remote_timeline_path)?;
|
||||
std::fs::write(
|
||||
remote_timeline_path.join(remote_layer_file_name_1.clone()),
|
||||
content,
|
||||
)?;
|
||||
ctx.runtime.block_on(client.push_layers(
|
||||
tenant_id,
|
||||
TIMELINE_ID,
|
||||
now_generation,
|
||||
[(layer_file_name_1.clone(), layer_generation)].to_vec(),
|
||||
))?;
|
||||
ctx.runtime.block_on(client.flush())?;
|
||||
assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);
|
||||
|
||||
// Restart the deletion queue
|
||||
drop(client);
|
||||
ctx.restart();
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
|
||||
// If we have recovered the deletion list properly, then executing after restart should purge it
|
||||
info!("Flush-executing");
|
||||
ctx.runtime.block_on(client.flush_execute())?;
|
||||
assert_remote_files(&[], &remote_timeline_path);
|
||||
assert_local_files(&["header-01"], &deletion_prefix);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence
|
||||
/// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
|
||||
#[cfg(test)]
|
||||
pub mod mock {
|
||||
use tracing::info;
|
||||
|
||||
use crate::tenant::remote_timeline_client::remote_layer_path;
|
||||
|
||||
use super::*;
|
||||
use std::sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
};
|
||||
|
||||
pub struct MockDeletionQueue {
|
||||
tx: tokio::sync::mpsc::Sender<FrontendQueueMessage>,
|
||||
executor_tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
|
||||
tx_pump: tokio::sync::mpsc::Sender<FlushOp>,
|
||||
executed: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl MockDeletionQueue {
|
||||
pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(16384);
|
||||
let (tx_pump, mut rx_pump) = tokio::sync::mpsc::channel::<FlushOp>(1);
|
||||
let (executor_tx, mut executor_rx) = tokio::sync::mpsc::channel(16384);
|
||||
|
||||
let executed = Arc::new(AtomicUsize::new(0));
|
||||
let executed_bg = executed.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let remote_storage = match &remote_storage {
|
||||
Some(rs) => rs,
|
||||
None => {
|
||||
info!("No remote storage configured, deletion queue will not run");
|
||||
return;
|
||||
}
|
||||
};
|
||||
info!("Running mock deletion queue");
|
||||
// Each time we are asked to pump, drain the queue of deletions
|
||||
while let Some(flush_op) = rx_pump.recv().await {
|
||||
info!("Executing all pending deletions");
|
||||
|
||||
// Transform all executor messages to generic frontend messages
|
||||
while let Ok(msg) = executor_rx.try_recv() {
|
||||
match msg {
|
||||
ExecutorMessage::Delete(objects) => {
|
||||
for path in objects {
|
||||
match remote_storage.delete(&path).await {
|
||||
Ok(_) => {
|
||||
debug!("Deleted {path}");
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to delete {path}, leaking object! ({e})"
|
||||
);
|
||||
}
|
||||
}
|
||||
executed_bg.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
ExecutorMessage::Flush(flush_op) => {
|
||||
flush_op.fire();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while let Ok(msg) = rx.try_recv() {
|
||||
match msg {
|
||||
FrontendQueueMessage::Delete(op) => {
|
||||
let mut objects = op.objects;
|
||||
for (layer, generation) in op.layers {
|
||||
objects.push(remote_layer_path(
|
||||
&op.tenant_id,
|
||||
&op.timeline_id,
|
||||
&layer,
|
||||
generation,
|
||||
));
|
||||
}
|
||||
|
||||
for path in objects {
|
||||
info!("Executing deletion {path}");
|
||||
match remote_storage.delete(&path).await {
|
||||
Ok(_) => {
|
||||
debug!("Deleted {path}");
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to delete {path}, leaking object! ({e})"
|
||||
);
|
||||
}
|
||||
}
|
||||
executed_bg.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
FrontendQueueMessage::Flush(op) => {
|
||||
op.fire();
|
||||
}
|
||||
FrontendQueueMessage::FlushExecute(op) => {
|
||||
// We have already executed all prior deletions because mock does them inline
|
||||
op.fire();
|
||||
}
|
||||
}
|
||||
info!("All pending deletions have been executed");
|
||||
}
|
||||
flush_op
|
||||
.tx
|
||||
.send(())
|
||||
.expect("Test called flush but dropped before finishing");
|
||||
}
|
||||
});
|
||||
|
||||
Self {
|
||||
tx,
|
||||
tx_pump,
|
||||
executor_tx,
|
||||
executed,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_executed(&self) -> usize {
|
||||
self.executed.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub async fn pump(&self) {
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
self.tx_pump
|
||||
.send(FlushOp { tx })
|
||||
.await
|
||||
.expect("pump called after deletion queue loop stopped");
|
||||
rx.await
|
||||
.expect("Mock delete queue shutdown while waiting to pump");
|
||||
}
|
||||
|
||||
pub(crate) fn new_client(&self) -> DeletionQueueClient {
|
||||
DeletionQueueClient {
|
||||
tx: self.tx.clone(),
|
||||
executor_tx: self.executor_tx.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
300
pageserver/src/deletion_queue/backend.rs
Normal file
300
pageserver/src/deletion_queue/backend.rs
Normal file
@@ -0,0 +1,300 @@
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::future::TryFutureExt;
|
||||
use pageserver_api::control_api::HexTenantId;
|
||||
use pageserver_api::control_api::{ValidateRequest, ValidateRequestTenant, ValidateResponse};
|
||||
use serde::de::DeserializeOwned;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::debug;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
use utils::backoff;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::metrics::DELETION_QUEUE_ERRORS;
|
||||
|
||||
use super::executor::ExecutorMessage;
|
||||
use super::DeletionHeader;
|
||||
use super::DeletionList;
|
||||
use super::DeletionQueueError;
|
||||
use super::FlushOp;
|
||||
|
||||
// After this length of time, execute deletions which are elegible to run,
|
||||
// even if we haven't accumulated enough for a full-sized DeleteObjects
|
||||
const EXECUTE_IDLE_DEADLINE: Duration = Duration::from_secs(60);
|
||||
|
||||
// If we have received this number of keys, proceed with attempting to execute
|
||||
const AUTOFLUSH_KEY_COUNT: usize = 16384;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) enum BackendQueueMessage {
|
||||
Delete(DeletionList),
|
||||
Flush(FlushOp),
|
||||
}
|
||||
pub struct BackendQueueWorker {
|
||||
conf: &'static PageServerConf,
|
||||
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
|
||||
tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
|
||||
|
||||
// Accumulate some lists to execute in a batch.
|
||||
// The purpose of this accumulation is to implement batched validation of
|
||||
// attachment generations, when split-brain protection is implemented.
|
||||
// (see https://github.com/neondatabase/neon/pull/4919)
|
||||
pending_lists: Vec<DeletionList>,
|
||||
|
||||
// Sum of all the lengths of lists in pending_lists
|
||||
pending_key_count: usize,
|
||||
|
||||
// DeletionLists we have fully executed, which may be deleted
|
||||
// from remote storage.
|
||||
executed_lists: Vec<DeletionList>,
|
||||
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum ValidateCallError {
|
||||
#[error("shutdown")]
|
||||
Shutdown,
|
||||
#[error("remote: {0}")]
|
||||
Remote(reqwest::Error),
|
||||
}
|
||||
|
||||
async fn retry_http_forever<T>(
|
||||
url: &url::Url,
|
||||
request: ValidateRequest,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<T, DeletionQueueError>
|
||||
where
|
||||
T: DeserializeOwned,
|
||||
{
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.build()
|
||||
.expect("Failed to construct http client");
|
||||
|
||||
let response = match backoff::retry(
|
||||
|| {
|
||||
client
|
||||
.post(url.clone())
|
||||
.json(&request)
|
||||
.send()
|
||||
.map_err(|e| ValidateCallError::Remote(e))
|
||||
},
|
||||
|_| false,
|
||||
3,
|
||||
u32::MAX,
|
||||
"calling control plane generation validation API",
|
||||
backoff::Cancel::new(cancel.clone(), || ValidateCallError::Shutdown),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(ValidateCallError::Shutdown) => {
|
||||
return Err(DeletionQueueError::ShuttingDown);
|
||||
}
|
||||
Err(ValidateCallError::Remote(_)) => {
|
||||
panic!("We retry forever");
|
||||
}
|
||||
Ok(r) => r,
|
||||
};
|
||||
|
||||
// TODO: handle non-200 response
|
||||
// TODO: handle decode error
|
||||
Ok(response.json::<T>().await.unwrap())
|
||||
}
|
||||
|
||||
impl BackendQueueWorker {
|
||||
pub(super) fn new(
|
||||
conf: &'static PageServerConf,
|
||||
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
|
||||
tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
Self {
|
||||
conf,
|
||||
rx,
|
||||
tx,
|
||||
pending_lists: Vec::new(),
|
||||
pending_key_count: 0,
|
||||
executed_lists: Vec::new(),
|
||||
cancel,
|
||||
}
|
||||
}
|
||||
|
||||
async fn cleanup_lists(&mut self) {
|
||||
debug!(
|
||||
"cleanup_lists: {0} executed lists, {1} pending lists",
|
||||
self.executed_lists.len(),
|
||||
self.pending_lists.len()
|
||||
);
|
||||
|
||||
// Lists are always pushed into the queues + executed list in sequence order, so
|
||||
// no sort is required: can find the highest sequence number by peeking at last element
|
||||
let max_executed_seq = match self.executed_lists.last() {
|
||||
Some(v) => v.sequence,
|
||||
None => {
|
||||
// No executed lists, nothing to clean up.
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// In case this is the last list, write a header out first so that
|
||||
// we don't risk losing our knowledge of the sequence number (on replay, our
|
||||
// next sequence number is the highest list seen + 1, or read from the header
|
||||
// if there are no lists)
|
||||
let header = DeletionHeader::new(max_executed_seq);
|
||||
debug!("Writing header {:?}", header);
|
||||
let header_bytes =
|
||||
serde_json::to_vec(&header).expect("Failed to serialize deletion header");
|
||||
let header_path = self.conf.deletion_header_path();
|
||||
|
||||
if let Err(e) = tokio::fs::write(&header_path, header_bytes).await {
|
||||
warn!("Failed to upload deletion queue header: {e:#}");
|
||||
DELETION_QUEUE_ERRORS
|
||||
.with_label_values(&["put_header"])
|
||||
.inc();
|
||||
return;
|
||||
}
|
||||
|
||||
while let Some(list) = self.executed_lists.pop() {
|
||||
let list_path = self.conf.deletion_list_path(list.sequence);
|
||||
if let Err(e) = tokio::fs::remove_file(&list_path).await {
|
||||
// Unexpected: we should have permissions and nothing else should
|
||||
// be touching these files
|
||||
tracing::error!("Failed to delete {0}: {e:#}", list_path.display());
|
||||
self.executed_lists.push(list);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn validate_lists(&mut self) -> Result<(), DeletionQueueError> {
|
||||
let control_plane_api = match &self.conf.control_plane_api {
|
||||
None => {
|
||||
// Generations are not switched on yet.
|
||||
return Ok(());
|
||||
}
|
||||
Some(api) => api,
|
||||
};
|
||||
|
||||
let validate_path = control_plane_api
|
||||
.join("validate")
|
||||
.expect("Failed to build validate path");
|
||||
|
||||
for list in &mut self.pending_lists {
|
||||
let request = ValidateRequest {
|
||||
tenants: list
|
||||
.tenants
|
||||
.iter()
|
||||
.map(|(tid, tdl)| ValidateRequestTenant {
|
||||
id: HexTenantId::new(*tid),
|
||||
gen: tdl.generation.into().expect(
|
||||
"Generation should always be valid for a Tenant doing deletions",
|
||||
),
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
|
||||
// Retry forever, we cannot make progress until we get a response
|
||||
let response: ValidateResponse =
|
||||
retry_http_forever(&validate_path, request, self.cancel.clone()).await?;
|
||||
|
||||
let tenants_valid: HashMap<_, _> = response
|
||||
.tenants
|
||||
.into_iter()
|
||||
.map(|t| (t.id.take(), t.valid))
|
||||
.collect();
|
||||
|
||||
// Filter the list based on whether the server responded valid: true.
|
||||
// If a tenant is omitted in the response, it has been deleted, and we should
|
||||
// proceed with deletion.
|
||||
list.tenants.retain(|tenant_id, _tenant| {
|
||||
let r = tenants_valid.get(tenant_id).map(|v| *v).unwrap_or(true);
|
||||
if !r {
|
||||
warn!("Dropping stale deletions for tenant {tenant_id}, objects may be leaked");
|
||||
}
|
||||
r
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn flush(&mut self) {
|
||||
// Issue any required generation validation calls to the control plane
|
||||
if let Err(DeletionQueueError::ShuttingDown) = self.validate_lists().await {
|
||||
warn!("Shutting down");
|
||||
return;
|
||||
}
|
||||
|
||||
// Submit all keys from pending DeletionLists into the executor
|
||||
for list in self.pending_lists.drain(..) {
|
||||
let objects = list.take_paths();
|
||||
if let Err(_e) = self.tx.send(ExecutorMessage::Delete(objects)).await {
|
||||
warn!("Shutting down");
|
||||
return;
|
||||
};
|
||||
}
|
||||
|
||||
// Flush the executor to ensure all the operations we just submitted have been executed
|
||||
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
|
||||
let flush_op = FlushOp { tx };
|
||||
if let Err(_e) = self.tx.send(ExecutorMessage::Flush(flush_op)).await {
|
||||
warn!("Shutting down");
|
||||
return;
|
||||
};
|
||||
if rx.await.is_err() {
|
||||
warn!("Shutting down");
|
||||
return;
|
||||
}
|
||||
|
||||
// After flush, we are assured that all contents of the pending lists
|
||||
// are executed
|
||||
self.pending_key_count = 0;
|
||||
self.executed_lists.append(&mut self.pending_lists);
|
||||
|
||||
// Erase the lists we executed
|
||||
self.cleanup_lists().await;
|
||||
}
|
||||
|
||||
pub async fn background(&mut self) {
|
||||
// TODO: if we would like to be able to defer deletions while a Layer still has
|
||||
// refs (but it will be elegible for deletion after process ends), then we may
|
||||
// add an ephemeral part to BackendQueueMessage::Delete that tracks which keys
|
||||
// in the deletion list may not be deleted yet, with guards to block on while
|
||||
// we wait to proceed.
|
||||
|
||||
loop {
|
||||
let msg = match tokio::time::timeout(EXECUTE_IDLE_DEADLINE, self.rx.recv()).await {
|
||||
Ok(Some(m)) => m,
|
||||
Ok(None) => {
|
||||
// All queue senders closed
|
||||
info!("Shutting down");
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
// Timeout, we hit deadline to execute whatever we have in hand. These functions will
|
||||
// return immediately if no work is pending
|
||||
self.flush().await;
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match msg {
|
||||
BackendQueueMessage::Delete(list) => {
|
||||
self.pending_key_count += list.len();
|
||||
self.pending_lists.push(list);
|
||||
|
||||
if self.pending_key_count > AUTOFLUSH_KEY_COUNT {
|
||||
self.flush().await;
|
||||
}
|
||||
}
|
||||
BackendQueueMessage::Flush(op) => {
|
||||
self.flush().await;
|
||||
op.fire();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
143
pageserver/src/deletion_queue/executor.rs
Normal file
143
pageserver/src/deletion_queue/executor.rs
Normal file
@@ -0,0 +1,143 @@
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::RemotePath;
|
||||
use remote_storage::MAX_KEYS_PER_DELETE;
|
||||
use std::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::metrics::DELETION_QUEUE_ERRORS;
|
||||
use crate::metrics::DELETION_QUEUE_EXECUTED;
|
||||
|
||||
use super::DeletionQueueError;
|
||||
use super::FlushOp;
|
||||
|
||||
const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
|
||||
|
||||
pub(super) enum ExecutorMessage {
|
||||
Delete(Vec<RemotePath>),
|
||||
Flush(FlushOp),
|
||||
}
|
||||
|
||||
/// Non-persistent deletion queue, for coalescing multiple object deletes into
|
||||
/// larger DeleteObjects requests.
|
||||
pub struct ExecutorWorker {
|
||||
// Accumulate up to 1000 keys for the next deletion operation
|
||||
accumulator: Vec<RemotePath>,
|
||||
|
||||
rx: tokio::sync::mpsc::Receiver<ExecutorMessage>,
|
||||
|
||||
cancel: CancellationToken,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
}
|
||||
|
||||
impl ExecutorWorker {
|
||||
pub(super) fn new(
|
||||
remote_storage: GenericRemoteStorage,
|
||||
rx: tokio::sync::mpsc::Receiver<ExecutorMessage>,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
Self {
|
||||
remote_storage,
|
||||
rx,
|
||||
cancel,
|
||||
accumulator: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap the remote `delete_objects` with a failpoint
|
||||
pub async fn remote_delete(&self) -> Result<(), anyhow::Error> {
|
||||
fail::fail_point!("deletion-queue-before-execute", |_| {
|
||||
info!("Skipping execution, failpoint set");
|
||||
DELETION_QUEUE_ERRORS
|
||||
.with_label_values(&["failpoint"])
|
||||
.inc();
|
||||
Err(anyhow::anyhow!("failpoint hit"))
|
||||
});
|
||||
|
||||
self.remote_storage.delete_objects(&self.accumulator).await
|
||||
}
|
||||
|
||||
/// Block until everything in accumulator has been executed
|
||||
pub async fn flush(&mut self) -> Result<(), DeletionQueueError> {
|
||||
while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
|
||||
match self.remote_delete().await {
|
||||
Ok(()) => {
|
||||
// Note: we assume that the remote storage layer returns Ok(()) if some
|
||||
// or all of the deleted objects were already gone.
|
||||
DELETION_QUEUE_EXECUTED.inc_by(self.accumulator.len() as u64);
|
||||
info!(
|
||||
"Executed deletion batch {}..{}",
|
||||
self.accumulator
|
||||
.first()
|
||||
.expect("accumulator should be non-empty"),
|
||||
self.accumulator
|
||||
.last()
|
||||
.expect("accumulator should be non-empty"),
|
||||
);
|
||||
self.accumulator.clear();
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("DeleteObjects request failed: {e:#}, will retry");
|
||||
DELETION_QUEUE_ERRORS.with_label_values(&["execute"]).inc();
|
||||
}
|
||||
};
|
||||
}
|
||||
if self.cancel.is_cancelled() {
|
||||
// Expose an error because we may not have actually flushed everything
|
||||
Err(DeletionQueueError::ShuttingDown)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn background(&mut self) -> Result<(), DeletionQueueError> {
|
||||
self.accumulator.reserve(MAX_KEYS_PER_DELETE);
|
||||
|
||||
loop {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(DeletionQueueError::ShuttingDown);
|
||||
}
|
||||
|
||||
let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
|
||||
Ok(Some(m)) => m,
|
||||
Ok(None) => {
|
||||
// All queue senders closed
|
||||
info!("Shutting down");
|
||||
return Err(DeletionQueueError::ShuttingDown);
|
||||
}
|
||||
Err(_) => {
|
||||
// Timeout, we hit deadline to execute whatever we have in hand. These functions will
|
||||
// return immediately if no work is pending
|
||||
self.flush().await?;
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match msg {
|
||||
ExecutorMessage::Delete(mut list) => {
|
||||
while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
|
||||
if self.accumulator.len() == MAX_KEYS_PER_DELETE {
|
||||
self.flush().await?;
|
||||
// If we have received this number of keys, proceed with attempting to execute
|
||||
assert_eq!(self.accumulator.len(), 0);
|
||||
}
|
||||
|
||||
let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
|
||||
let take_count = std::cmp::min(available_slots, list.len());
|
||||
for path in list.drain(list.len() - take_count..) {
|
||||
self.accumulator.push(path);
|
||||
}
|
||||
}
|
||||
}
|
||||
ExecutorMessage::Flush(flush_op) => {
|
||||
// If flush() errors, we drop the flush_op and the caller will get
|
||||
// an error recv()'ing their oneshot channel.
|
||||
self.flush().await?;
|
||||
flush_op.fire();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
376
pageserver/src/deletion_queue/frontend.rs
Normal file
376
pageserver/src/deletion_queue/frontend.rs
Normal file
@@ -0,0 +1,376 @@
|
||||
use super::BackendQueueMessage;
|
||||
use super::DeletionHeader;
|
||||
use super::DeletionList;
|
||||
use super::FlushOp;
|
||||
|
||||
use std::fs::create_dir_all;
|
||||
use std::time::Duration;
|
||||
|
||||
use regex::Regex;
|
||||
use remote_storage::RemotePath;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::debug;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TenantId;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::metrics::DELETION_QUEUE_ERRORS;
|
||||
use crate::metrics::DELETION_QUEUE_SUBMITTED;
|
||||
use crate::tenant::remote_timeline_client::remote_layer_path;
|
||||
use crate::tenant::storage_layer::LayerFileName;
|
||||
|
||||
// The number of keys in a DeletionList before we will proactively persist it
|
||||
// (without reaching a flush deadline). This aims to deliver objects of the order
|
||||
// of magnitude 1MB when we are under heavy delete load.
|
||||
const DELETION_LIST_TARGET_SIZE: usize = 16384;
|
||||
|
||||
// Ordinarily, we only flush to DeletionList periodically, to bound the window during
|
||||
// which we might leak objects from not flushing a DeletionList after
|
||||
// the objects are already unlinked from timeline metadata.
|
||||
const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);
|
||||
|
||||
// If someone is waiting for a flush to DeletionList, only delay a little to accumulate
|
||||
// more objects before doing the flush.
|
||||
const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) struct DeletionOp {
|
||||
pub(super) tenant_id: TenantId,
|
||||
pub(super) timeline_id: TimelineId,
|
||||
// `layers` and `objects` are both just lists of objects. `layers` is used if you do not
|
||||
// have a config object handy to project it to a remote key, and need the consuming worker
|
||||
// to do it for you.
|
||||
pub(super) layers: Vec<(LayerFileName, Generation)>,
|
||||
pub(super) objects: Vec<RemotePath>,
|
||||
|
||||
/// The _current_ generation of the Tenant attachment in which we are enqueuing
|
||||
/// this deletion.
|
||||
pub(super) generation: Generation,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) enum FrontendQueueMessage {
|
||||
Delete(DeletionOp),
|
||||
// Wait until all prior deletions make it into a persistent DeletionList
|
||||
Flush(FlushOp),
|
||||
// Wait until all prior deletions have been executed (i.e. objects are actually deleted)
|
||||
FlushExecute(FlushOp),
|
||||
}
|
||||
|
||||
pub struct FrontendQueueWorker {
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
// Incoming frontend requests to delete some keys
|
||||
rx: tokio::sync::mpsc::Receiver<FrontendQueueMessage>,
|
||||
|
||||
// Outbound requests to the backend to execute deletion lists we have composed.
|
||||
tx: tokio::sync::mpsc::Sender<BackendQueueMessage>,
|
||||
|
||||
// The list we are currently building, contains a buffer of keys to delete
|
||||
// and our next sequence number
|
||||
pending: DeletionList,
|
||||
|
||||
// These FlushOps should fire the next time we flush
|
||||
pending_flushes: Vec<FlushOp>,
|
||||
|
||||
// Worker loop is torn down when this fires.
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl FrontendQueueWorker {
|
||||
pub(super) fn new(
|
||||
conf: &'static PageServerConf,
|
||||
rx: tokio::sync::mpsc::Receiver<FrontendQueueMessage>,
|
||||
tx: tokio::sync::mpsc::Sender<BackendQueueMessage>,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
Self {
|
||||
pending: DeletionList::new(1),
|
||||
conf,
|
||||
rx,
|
||||
tx,
|
||||
pending_flushes: Vec::new(),
|
||||
cancel,
|
||||
}
|
||||
}
|
||||
async fn upload_pending_list(&mut self) -> anyhow::Result<()> {
|
||||
let path = self.conf.deletion_list_path(self.pending.sequence);
|
||||
|
||||
let bytes = serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list");
|
||||
tokio::fs::write(&path, &bytes).await?;
|
||||
tokio::fs::File::open(&path).await?.sync_all().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Try to flush `list` to persistent storage
|
||||
///
|
||||
/// This does not return errors, because on failure to flush we do not lose
|
||||
/// any state: flushing will be retried implicitly on the next deadline
|
||||
async fn flush(&mut self) {
|
||||
if self.pending.is_empty() {
|
||||
for f in self.pending_flushes.drain(..) {
|
||||
f.fire();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
match self.upload_pending_list().await {
|
||||
Ok(_) => {
|
||||
info!(sequence = self.pending.sequence, "Stored deletion list");
|
||||
|
||||
for f in self.pending_flushes.drain(..) {
|
||||
f.fire();
|
||||
}
|
||||
|
||||
let onward_list = self.pending.drain();
|
||||
|
||||
// We have consumed out of pending: reset it for the next incoming deletions to accumulate there
|
||||
self.pending = DeletionList::new(self.pending.sequence + 1);
|
||||
|
||||
if let Err(e) = self.tx.send(BackendQueueMessage::Delete(onward_list)).await {
|
||||
// This is allowed to fail: it will only happen if the backend worker is shut down,
|
||||
// so we can just drop this on the floor.
|
||||
info!("Deletion list dropped, this is normal during shutdown ({e:#})");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
DELETION_QUEUE_ERRORS.with_label_values(&["put_list"]).inc();
|
||||
warn!(
|
||||
sequence = self.pending.sequence,
|
||||
"Failed to write deletion list to remote storage, will retry later ({e:#})"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn recover(&mut self) -> Result<(), anyhow::Error> {
|
||||
// Load header: this is not required to be present, e.g. when a pageserver first runs
|
||||
let header_path = self.conf.deletion_header_path();
|
||||
|
||||
// Synchronous, but we only do it once per process lifetime so it's tolerable
|
||||
create_dir_all(&self.conf.deletion_prefix())?;
|
||||
|
||||
let header_bytes = match tokio::fs::read(&header_path).await {
|
||||
Ok(h) => Ok(Some(h)),
|
||||
Err(e) => {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
debug!(
|
||||
"Deletion header {0} not found, first start?",
|
||||
header_path.display()
|
||||
);
|
||||
Ok(None)
|
||||
} else {
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}?;
|
||||
|
||||
if let Some(header_bytes) = header_bytes {
|
||||
if let Some(header) = match serde_json::from_slice::<DeletionHeader>(&header_bytes) {
|
||||
Ok(h) => Some(h),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to deserialize deletion header, ignoring {0}: {e:#}",
|
||||
header_path.display()
|
||||
);
|
||||
// This should never happen unless we make a mistake with our serialization.
|
||||
// Ignoring a deletion header is not consequential for correctnes because all deletions
|
||||
// are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up.
|
||||
None
|
||||
}
|
||||
} {
|
||||
self.pending.sequence =
|
||||
std::cmp::max(self.pending.sequence, header.last_deleted_list_seq + 1);
|
||||
};
|
||||
};
|
||||
|
||||
let mut dir = match tokio::fs::read_dir(&self.conf.deletion_prefix()).await {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to open deletion list directory {0}: {e:#}",
|
||||
header_path.display()
|
||||
);
|
||||
|
||||
// Give up: if we can't read the deletion list directory, we probably can't
|
||||
// write lists into it later, so the queue won't work.
|
||||
return Err(e.into());
|
||||
}
|
||||
};
|
||||
|
||||
let list_name_pattern = Regex::new("([a-zA-Z0-9]{16})-([a-zA-Z0-9]{2}).list").unwrap();
|
||||
|
||||
let mut seqs: Vec<u64> = Vec::new();
|
||||
while let Some(dentry) = dir.next_entry().await? {
|
||||
let file_name = dentry.file_name().to_owned();
|
||||
let basename = file_name.to_string_lossy();
|
||||
let seq_part = if let Some(m) = list_name_pattern.captures(&basename) {
|
||||
m.get(1)
|
||||
.expect("Non optional group should be present")
|
||||
.as_str()
|
||||
} else {
|
||||
warn!("Unexpected key in deletion queue: {basename}");
|
||||
continue;
|
||||
};
|
||||
|
||||
let seq: u64 = match u64::from_str_radix(seq_part, 16) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
warn!("Malformed key '{basename}': {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
seqs.push(seq);
|
||||
}
|
||||
seqs.sort();
|
||||
|
||||
// Initialize the next sequence number in the frontend based on the maximum of the highest list we see,
|
||||
// and the last list that was deleted according to the header. Combined with writing out the header
|
||||
// prior to deletions, this guarnatees no re-use of sequence numbers.
|
||||
if let Some(max_list_seq) = seqs.last() {
|
||||
self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1);
|
||||
}
|
||||
|
||||
for s in seqs {
|
||||
let list_path = self.conf.deletion_list_path(s);
|
||||
let list_bytes = tokio::fs::read(&list_path).await?;
|
||||
|
||||
let deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
|
||||
Ok(l) => l,
|
||||
Err(e) => {
|
||||
// Drop the list on the floor: any objects it referenced will be left behind
|
||||
// for scrubbing to clean up. This should never happen unless we have a serialization bug.
|
||||
warn!(sequence = s, "Failed to deserialize deletion list: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// We will drop out of recovery if this fails: it indicates that we are shutting down
|
||||
// or the backend has panicked
|
||||
DELETION_QUEUE_SUBMITTED.inc_by(deletion_list.len() as u64);
|
||||
self.tx
|
||||
.send(BackendQueueMessage::Delete(deletion_list))
|
||||
.await?;
|
||||
}
|
||||
|
||||
info!(next_sequence = self.pending.sequence, "Replay complete");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This is the front-end ingest, where we bundle up deletion requests into DeletionList
|
||||
/// and write them out, for later
|
||||
pub async fn background(&mut self) {
|
||||
info!("Started deletion frontend worker");
|
||||
|
||||
let mut recovered: bool = false;
|
||||
|
||||
while !self.cancel.is_cancelled() {
|
||||
let timeout = if self.pending_flushes.is_empty() {
|
||||
FRONTEND_DEFAULT_TIMEOUT
|
||||
} else {
|
||||
FRONTEND_FLUSHING_TIMEOUT
|
||||
};
|
||||
|
||||
let msg = match tokio::time::timeout(timeout, self.rx.recv()).await {
|
||||
Ok(Some(msg)) => msg,
|
||||
Ok(None) => {
|
||||
// Queue sender destroyed, shutting down
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
// Hit deadline, flush.
|
||||
self.flush().await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// On first message, do recovery. This avoids unnecessary recovery very
|
||||
// early in startup, and simplifies testing by avoiding a 404 reading the
|
||||
// header on every first pageserver startup.
|
||||
if !recovered {
|
||||
// Before accepting any input from this pageserver lifetime, recover all deletion lists that are in S3
|
||||
if let Err(e) = self.recover().await {
|
||||
// This should only happen in truly unrecoverable cases, like the recovery finding that the backend
|
||||
// queue receiver has been dropped.
|
||||
info!("Deletion queue recover aborted, deletion queue will not proceed ({e})");
|
||||
return;
|
||||
} else {
|
||||
recovered = true;
|
||||
}
|
||||
}
|
||||
|
||||
match msg {
|
||||
FrontendQueueMessage::Delete(op) => {
|
||||
debug!(
|
||||
"Delete: ingesting {0} layers, {1} other objects",
|
||||
op.layers.len(),
|
||||
op.objects.len()
|
||||
);
|
||||
|
||||
let mut layer_paths = Vec::new();
|
||||
for (layer, generation) in op.layers {
|
||||
layer_paths.push(remote_layer_path(
|
||||
&op.tenant_id,
|
||||
&op.timeline_id,
|
||||
&layer,
|
||||
generation,
|
||||
));
|
||||
}
|
||||
layer_paths.extend(op.objects);
|
||||
|
||||
if self.pending.push(
|
||||
&op.tenant_id,
|
||||
&op.timeline_id,
|
||||
op.generation,
|
||||
&mut layer_paths,
|
||||
) == false
|
||||
{
|
||||
self.flush().await;
|
||||
let retry = self.pending.push(
|
||||
&op.tenant_id,
|
||||
&op.timeline_id,
|
||||
op.generation,
|
||||
&mut layer_paths,
|
||||
);
|
||||
if retry != true {
|
||||
// Unexpeted: after we flush, we should have
|
||||
// drained self.pending, so a conflict on
|
||||
// generation numbers should be impossible.
|
||||
tracing::error!(
|
||||
"Failed to enqueue deletions, leaking objects. This is a bug."
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
FrontendQueueMessage::Flush(op) => {
|
||||
if self.pending.is_empty() {
|
||||
// Execute immediately
|
||||
debug!("Flush: No pending objects, flushing immediately");
|
||||
op.fire()
|
||||
} else {
|
||||
// Execute next time we flush
|
||||
debug!("Flush: adding to pending flush list for next deadline flush");
|
||||
self.pending_flushes.push(op);
|
||||
}
|
||||
}
|
||||
FrontendQueueMessage::FlushExecute(op) => {
|
||||
debug!("FlushExecute: passing through to backend");
|
||||
// We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
|
||||
if let Err(e) = self.tx.send(BackendQueueMessage::Flush(op)).await {
|
||||
info!("Can't flush, shutting down ({e})");
|
||||
// Caller will get error when their oneshot sender was dropped.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if self.pending.len() > DELETION_LIST_TARGET_SIZE || !self.pending_flushes.is_empty() {
|
||||
self.flush().await;
|
||||
}
|
||||
}
|
||||
info!("Deletion queue shut down.");
|
||||
}
|
||||
}
|
||||
@@ -52,6 +52,29 @@ paths:
|
||||
schema:
|
||||
type: object
|
||||
|
||||
/v1/deletion_queue/flush:
|
||||
parameters:
|
||||
- name: execute
|
||||
in: query
|
||||
required: false
|
||||
schema:
|
||||
type: boolean
|
||||
description:
|
||||
If true, attempt to execute deletions. If false, just flush deletions to persistent deletion lists.
|
||||
put:
|
||||
description: Execute any deletions currently enqueued
|
||||
security: []
|
||||
responses:
|
||||
"200":
|
||||
description: |
|
||||
Flush completed: if execute was true, then enqueued deletions have been completed. If execute was false,
|
||||
then enqueued deletions have been persisted to deletion lists, and may have been completed.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
|
||||
|
||||
/v1/tenant/{tenant_id}:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
@@ -383,7 +406,6 @@ paths:
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
|
||||
post:
|
||||
description: |
|
||||
Schedules attach operation to happen in the background for the given tenant.
|
||||
@@ -1020,6 +1042,9 @@ components:
|
||||
properties:
|
||||
config:
|
||||
$ref: '#/components/schemas/TenantConfig'
|
||||
generation:
|
||||
type: integer
|
||||
description: Attachment generation number.
|
||||
TenantConfigRequest:
|
||||
allOf:
|
||||
- $ref: '#/components/schemas/TenantConfig'
|
||||
|
||||
@@ -23,6 +23,7 @@ use super::models::{
|
||||
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
|
||||
};
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::{DeletionQueue, DeletionQueueError};
|
||||
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::task_mgr::TaskKind;
|
||||
@@ -32,11 +33,13 @@ use crate::tenant::mgr::{
|
||||
};
|
||||
use crate::tenant::size::ModelInputs;
|
||||
use crate::tenant::storage_layer::LayerAccessStatsReset;
|
||||
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
|
||||
use crate::tenant::timeline::Timeline;
|
||||
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
|
||||
use crate::{config::PageServerConf, tenant::mgr};
|
||||
use crate::{disk_usage_eviction_task, tenant};
|
||||
use utils::{
|
||||
auth::JwtAuth,
|
||||
generation::Generation,
|
||||
http::{
|
||||
endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
|
||||
error::{ApiError, HttpErrorBody},
|
||||
@@ -56,6 +59,7 @@ struct State {
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
allowlist_routes: Vec<Uri>,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
deletion_queue: DeletionQueue,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
|
||||
}
|
||||
@@ -65,6 +69,7 @@ impl State {
|
||||
conf: &'static PageServerConf,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
deletion_queue: DeletionQueue,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
|
||||
) -> anyhow::Result<Self> {
|
||||
@@ -78,6 +83,7 @@ impl State {
|
||||
allowlist_routes,
|
||||
remote_storage,
|
||||
broker_client,
|
||||
deletion_queue,
|
||||
disk_usage_eviction_state,
|
||||
})
|
||||
}
|
||||
@@ -472,7 +478,7 @@ async fn tenant_attach_handler(
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let maybe_body: Option<TenantAttachRequest> = json_request_or_empty_body(&mut request).await?;
|
||||
let tenant_conf = match maybe_body {
|
||||
let tenant_conf = match &maybe_body {
|
||||
Some(request) => TenantConfOpt::try_from(&*request.config).map_err(ApiError::BadRequest)?,
|
||||
None => TenantConfOpt::default(),
|
||||
};
|
||||
@@ -483,13 +489,30 @@ async fn tenant_attach_handler(
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
let generation = if state.conf.control_plane_api.is_some() {
|
||||
// If we have been configured with a control plane URI, then generations are
|
||||
// mandatory, as we will attempt to re-attach on startup.
|
||||
maybe_body
|
||||
.as_ref()
|
||||
.map(|tar| tar.generation)
|
||||
.flatten()
|
||||
.map(|g| Generation::new(g))
|
||||
.ok_or(ApiError::BadRequest(anyhow!(
|
||||
"generation attribute missing"
|
||||
)))?
|
||||
} else {
|
||||
Generation::none()
|
||||
};
|
||||
|
||||
if let Some(remote_storage) = &state.remote_storage {
|
||||
mgr::attach_tenant(
|
||||
state.conf,
|
||||
tenant_id,
|
||||
generation,
|
||||
tenant_conf,
|
||||
state.broker_client.clone(),
|
||||
remote_storage.clone(),
|
||||
&state.deletion_queue,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(info_span!("tenant_attach", %tenant_id))
|
||||
@@ -552,6 +575,7 @@ async fn tenant_load_handler(
|
||||
tenant_id,
|
||||
state.broker_client.clone(),
|
||||
state.remote_storage.clone(),
|
||||
&state.deletion_queue,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(info_span!("load", %tenant_id))
|
||||
@@ -867,6 +891,12 @@ async fn tenant_create_handler(
|
||||
let tenant_conf =
|
||||
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
|
||||
|
||||
// TODO: make generation mandatory here once control plane supports it.
|
||||
let generation = request_data
|
||||
.generation
|
||||
.map(|g| Generation::new(g))
|
||||
.unwrap_or(Generation::none());
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let state = get_state(&request);
|
||||
@@ -875,8 +905,10 @@ async fn tenant_create_handler(
|
||||
state.conf,
|
||||
tenant_conf,
|
||||
target_tenant_id,
|
||||
generation,
|
||||
state.broker_client.clone(),
|
||||
state.remote_storage.clone(),
|
||||
&state.deletion_queue,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(info_span!("tenant_create", tenant_id = %target_tenant_id))
|
||||
@@ -1117,6 +1149,48 @@ async fn always_panic_handler(
|
||||
json_response(StatusCode::NO_CONTENT, ())
|
||||
}
|
||||
|
||||
async fn deletion_queue_flush(
|
||||
r: Request<Body>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let state = get_state(&r);
|
||||
|
||||
if state.remote_storage.is_none() {
|
||||
// Nothing to do if remote storage is disabled.
|
||||
return json_response(StatusCode::OK, ());
|
||||
}
|
||||
|
||||
let execute = parse_query_param(&r, "execute")?.unwrap_or(false);
|
||||
|
||||
let queue_client = state.deletion_queue.new_client();
|
||||
|
||||
tokio::select! {
|
||||
flush_result = async {
|
||||
if execute {
|
||||
queue_client.flush_execute().await
|
||||
} else {
|
||||
queue_client.flush().await
|
||||
}
|
||||
} => {
|
||||
match flush_result {
|
||||
Ok(())=> {
|
||||
json_response(StatusCode::OK, ())
|
||||
},
|
||||
Err(e) => {
|
||||
match e {
|
||||
DeletionQueueError::ShuttingDown => {
|
||||
Err(ApiError::ShuttingDown)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
Err(ApiError::ShuttingDown)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn disk_usage_eviction_run(
|
||||
mut r: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -1326,6 +1400,7 @@ pub fn make_router(
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
broker_client: BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
deletion_queue: DeletionQueue,
|
||||
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
|
||||
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
|
||||
let spec = include_bytes!("openapi_spec.yml");
|
||||
@@ -1355,6 +1430,7 @@ pub fn make_router(
|
||||
conf,
|
||||
auth,
|
||||
remote_storage,
|
||||
deletion_queue,
|
||||
broker_client,
|
||||
disk_usage_eviction_state,
|
||||
)
|
||||
@@ -1439,6 +1515,9 @@ pub fn make_router(
|
||||
.put("/v1/disk_usage_eviction/run", |r| {
|
||||
api_handler(r, disk_usage_eviction_run)
|
||||
})
|
||||
.put("/v1/deletion_queue/flush", |r| {
|
||||
api_handler(r, deletion_queue_flush)
|
||||
})
|
||||
.put("/v1/tenant/:tenant_id/break", |r| {
|
||||
testing_api_handler("set tenant state to broken", r, handle_tenant_break)
|
||||
})
|
||||
|
||||
@@ -3,6 +3,7 @@ pub mod basebackup;
|
||||
pub mod config;
|
||||
pub mod consumption_metrics;
|
||||
pub mod context;
|
||||
pub mod deletion_queue;
|
||||
pub mod disk_usage_eviction_task;
|
||||
pub mod http;
|
||||
pub mod import_datadir;
|
||||
|
||||
@@ -795,6 +795,31 @@ static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy<HistogramVec> = Lazy::new
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static DELETION_QUEUE_SUBMITTED: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_deletion_queue_submitted_total",
|
||||
"Number of objects submitted for deletion"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static DELETION_QUEUE_EXECUTED: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_deletion_queue_executed_total",
|
||||
"Number of objects deleted"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static DELETION_QUEUE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_deletion_queue_errors_total",
|
||||
"Incremented on retryable remote I/O errors writing deletion lists or executing deletions.",
|
||||
&["op_kind"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_remote_timeline_client_bytes_started",
|
||||
|
||||
@@ -59,6 +59,7 @@ use self::timeline::EvictionTaskTenantState;
|
||||
use self::timeline::TimelineResources;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::import_datadir;
|
||||
use crate::is_uninit_mark;
|
||||
use crate::metrics::TENANT_ACTIVATION;
|
||||
@@ -85,6 +86,7 @@ pub use pageserver_api::models::TenantState;
|
||||
use toml_edit;
|
||||
use utils::{
|
||||
crashsafe,
|
||||
generation::Generation,
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::{Lsn, RecordLsn},
|
||||
};
|
||||
@@ -119,7 +121,7 @@ mod span;
|
||||
|
||||
pub mod metadata;
|
||||
mod par_fsync;
|
||||
mod remote_timeline_client;
|
||||
pub mod remote_timeline_client;
|
||||
pub mod storage_layer;
|
||||
|
||||
pub mod config;
|
||||
@@ -156,6 +158,7 @@ pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
|
||||
pub struct TenantSharedResources {
|
||||
pub broker_client: storage_broker::BrokerClientChannel,
|
||||
pub remote_storage: Option<GenericRemoteStorage>,
|
||||
pub deletion_queue_client: DeletionQueueClient,
|
||||
}
|
||||
|
||||
///
|
||||
@@ -178,6 +181,10 @@ pub struct Tenant {
|
||||
tenant_conf: Arc<RwLock<TenantConfOpt>>,
|
||||
|
||||
tenant_id: TenantId,
|
||||
|
||||
// The remote storage generation, used to protect S3 objects from split-brain
|
||||
generation: Generation,
|
||||
|
||||
timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
// This mutex prevents creation of new timelines during GC.
|
||||
// Adding yet another mutex (in addition to `timelines`) is needed because holding
|
||||
@@ -191,6 +198,9 @@ pub struct Tenant {
|
||||
// provides access to timeline data sitting in the remote storage
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
|
||||
// Access to global deletion queue for when this tenant wants to schedule a deletion
|
||||
deletion_queue_client: Option<DeletionQueueClient>,
|
||||
|
||||
/// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
|
||||
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
|
||||
cached_synthetic_tenant_size: Arc<AtomicU64>,
|
||||
@@ -522,9 +532,11 @@ impl Tenant {
|
||||
pub(crate) fn spawn_attach(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
generation: Generation,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
// TODO dedup with spawn_load
|
||||
@@ -538,7 +550,9 @@ impl Tenant {
|
||||
tenant_conf,
|
||||
wal_redo_manager,
|
||||
tenant_id,
|
||||
generation,
|
||||
Some(remote_storage.clone()),
|
||||
Some(deletion_queue_client),
|
||||
));
|
||||
|
||||
// Do all the hard work in the background
|
||||
@@ -648,12 +662,8 @@ impl Tenant {
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
|
||||
|
||||
let remote_timeline_ids = remote_timeline_client::list_remote_timelines(
|
||||
remote_storage,
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
)
|
||||
.await?;
|
||||
let remote_timeline_ids =
|
||||
remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
|
||||
|
||||
info!("found {} timelines", remote_timeline_ids.len());
|
||||
|
||||
@@ -665,6 +675,7 @@ impl Tenant {
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
self.generation,
|
||||
);
|
||||
part_downloads.spawn(
|
||||
async move {
|
||||
@@ -727,6 +738,7 @@ impl Tenant {
|
||||
remote_metadata,
|
||||
TimelineResources {
|
||||
remote_client: Some(remote_client),
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
@@ -751,6 +763,7 @@ impl Tenant {
|
||||
timeline_id,
|
||||
&index_part.metadata,
|
||||
Some(remote_timeline_client),
|
||||
self.deletion_queue_client.clone(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
@@ -851,6 +864,8 @@ impl Tenant {
|
||||
TenantConfOpt::default(),
|
||||
wal_redo_manager,
|
||||
tenant_id,
|
||||
Generation::broken(),
|
||||
None,
|
||||
None,
|
||||
))
|
||||
}
|
||||
@@ -868,6 +883,7 @@ impl Tenant {
|
||||
pub(crate) fn spawn_load(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
generation: Generation,
|
||||
resources: TenantSharedResources,
|
||||
init_order: Option<InitializationOrder>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
@@ -885,6 +901,7 @@ impl Tenant {
|
||||
|
||||
let broker_client = resources.broker_client;
|
||||
let remote_storage = resources.remote_storage;
|
||||
let deletion_queue_client = resources.deletion_queue_client;
|
||||
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
|
||||
let tenant = Tenant::new(
|
||||
@@ -893,7 +910,9 @@ impl Tenant {
|
||||
tenant_conf,
|
||||
wal_redo_manager,
|
||||
tenant_id,
|
||||
generation,
|
||||
remote_storage.clone(),
|
||||
Some(deletion_queue_client),
|
||||
);
|
||||
let tenant = Arc::new(tenant);
|
||||
|
||||
@@ -1301,6 +1320,7 @@ impl Tenant {
|
||||
timeline_id,
|
||||
&local_metadata,
|
||||
Some(remote_client),
|
||||
self.deletion_queue_client.clone(),
|
||||
init_order,
|
||||
)
|
||||
.await
|
||||
@@ -1350,6 +1370,7 @@ impl Tenant {
|
||||
timeline_id,
|
||||
&local_metadata,
|
||||
None,
|
||||
None,
|
||||
init_order,
|
||||
)
|
||||
.await
|
||||
@@ -2274,6 +2295,7 @@ impl Tenant {
|
||||
ancestor,
|
||||
new_timeline_id,
|
||||
self.tenant_id,
|
||||
self.generation,
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
resources,
|
||||
pg_version,
|
||||
@@ -2291,8 +2313,18 @@ impl Tenant {
|
||||
tenant_conf: TenantConfOpt,
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
tenant_id: TenantId,
|
||||
generation: Generation,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
deletion_queue_client: Option<DeletionQueueClient>,
|
||||
) -> Tenant {
|
||||
#[cfg(not(test))]
|
||||
match state {
|
||||
TenantState::Broken { .. } => {}
|
||||
_ => {
|
||||
// Non-broken tenants must be constructed with a deletion queue
|
||||
assert!(deletion_queue_client.is_some());
|
||||
}
|
||||
}
|
||||
let (state, mut rx) = watch::channel(state);
|
||||
|
||||
tokio::spawn(async move {
|
||||
@@ -2349,6 +2381,7 @@ impl Tenant {
|
||||
|
||||
Tenant {
|
||||
tenant_id,
|
||||
generation,
|
||||
conf,
|
||||
// using now here is good enough approximation to catch tenants with really long
|
||||
// activation times.
|
||||
@@ -2358,6 +2391,7 @@ impl Tenant {
|
||||
gc_cs: tokio::sync::Mutex::new(()),
|
||||
walredo_mgr,
|
||||
remote_storage,
|
||||
deletion_queue_client,
|
||||
state,
|
||||
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
|
||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||
@@ -2931,13 +2965,17 @@ impl Tenant {
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
self.generation,
|
||||
);
|
||||
Some(remote_client)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
TimelineResources { remote_client }
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates intermediate timeline structure and its files.
|
||||
@@ -3454,6 +3492,7 @@ pub mod harness {
|
||||
pub conf: &'static PageServerConf,
|
||||
pub tenant_conf: TenantConf,
|
||||
pub tenant_id: TenantId,
|
||||
pub generation: Generation,
|
||||
}
|
||||
|
||||
static LOG_HANDLE: OnceCell<()> = OnceCell::new();
|
||||
@@ -3495,13 +3534,14 @@ pub mod harness {
|
||||
conf,
|
||||
tenant_conf,
|
||||
tenant_id,
|
||||
generation: Generation::new(0xdeadbeef),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn load(&self) -> (Arc<Tenant>, RequestContext) {
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
(
|
||||
self.try_load(&ctx, None)
|
||||
self.try_load(&ctx, None, None)
|
||||
.await
|
||||
.expect("failed to load test tenant"),
|
||||
ctx,
|
||||
@@ -3512,6 +3552,7 @@ pub mod harness {
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
remote_storage: Option<remote_storage::GenericRemoteStorage>,
|
||||
deletion_queue_client: Option<DeletionQueueClient>,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
let walredo_mgr = Arc::new(TestRedoManager);
|
||||
|
||||
@@ -3521,7 +3562,9 @@ pub mod harness {
|
||||
TenantConfOpt::from(self.tenant_conf),
|
||||
walredo_mgr,
|
||||
self.tenant_id,
|
||||
self.generation,
|
||||
remote_storage,
|
||||
deletion_queue_client,
|
||||
));
|
||||
tenant
|
||||
.load(None, ctx)
|
||||
@@ -4086,7 +4129,7 @@ mod tests {
|
||||
std::fs::write(metadata_path, metadata_bytes)?;
|
||||
|
||||
let err = harness
|
||||
.try_load(&ctx, None)
|
||||
.try_load(&ctx, None, None)
|
||||
.await
|
||||
.err()
|
||||
.expect("should fail");
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
//! This module acts as a switchboard to access different repositories managed by this
|
||||
//! page server.
|
||||
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::control_api::{HexTenantId, ReAttachRequest, ReAttachResponse};
|
||||
use std::collections::{hash_map, HashMap};
|
||||
use std::ffi::OsStr;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::fs;
|
||||
|
||||
use anyhow::Context;
|
||||
@@ -18,6 +21,7 @@ use utils::crashsafe;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::DeletionQueue;
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::delete::DeleteTenantFlow;
|
||||
@@ -25,6 +29,7 @@ use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantSt
|
||||
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME};
|
||||
|
||||
use utils::fs_ext::PathExt;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::delete::DeleteTenantError;
|
||||
@@ -75,6 +80,78 @@ pub async fn init_tenant_mgr(
|
||||
|
||||
let mut tenants = HashMap::new();
|
||||
|
||||
// If we are configured to use the control plane API, then it is the source of truth for what to attach
|
||||
let tenant_generations = conf
|
||||
.control_plane_api
|
||||
.as_ref()
|
||||
.map(|control_plane_api| async {
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.build()
|
||||
.expect("Failed to construct http client");
|
||||
|
||||
// FIXME: it's awkward that join() requires the base to have a trailing slash, makes
|
||||
// it easy to get a config wrong
|
||||
assert!(
|
||||
control_plane_api.as_str().ends_with("/"),
|
||||
"control plane API needs trailing slash"
|
||||
);
|
||||
|
||||
let re_attach_path = control_plane_api
|
||||
.join("re-attach")
|
||||
.expect("Failed to build re-attach path");
|
||||
let request = ReAttachRequest { node_id: conf.id };
|
||||
|
||||
// TODO: we should have been passed a cancellation token, and use it to end
|
||||
// this loop gracefully
|
||||
loop {
|
||||
let response = match client
|
||||
.post(re_attach_path.clone())
|
||||
.json(&request)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Err(e) => Err(anyhow::Error::from(e)),
|
||||
Ok(r) => {
|
||||
if r.status() == StatusCode::OK {
|
||||
r.json::<ReAttachResponse>()
|
||||
.await
|
||||
.map_err(|e| anyhow::Error::from(e))
|
||||
} else {
|
||||
Err(anyhow::anyhow!("Unexpected status {}", r.status()))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
match response {
|
||||
Ok(res) => {
|
||||
tracing::info!(
|
||||
"Received re-attach response with {0} tenants",
|
||||
res.tenants.len()
|
||||
);
|
||||
|
||||
// TODO: do something with it
|
||||
break res
|
||||
.tenants
|
||||
.into_iter()
|
||||
.map(|t| (t.id, t.generation))
|
||||
.collect::<HashMap<_, _>>();
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Error re-attaching tenants, retrying: {e:#}");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let tenant_generations = match tenant_generations {
|
||||
Some(g) => Some(g.await),
|
||||
None => {
|
||||
info!("Control plane API not configured, tenant generations are disabled");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let mut dir_entries = fs::read_dir(&tenants_dir)
|
||||
.await
|
||||
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
|
||||
@@ -122,9 +199,53 @@ pub async fn init_tenant_mgr(
|
||||
continue;
|
||||
}
|
||||
|
||||
let tenant_id = match tenant_dir_path
|
||||
.file_name()
|
||||
.and_then(OsStr::to_str)
|
||||
.unwrap_or_default()
|
||||
.parse::<TenantId>()
|
||||
{
|
||||
Ok(id) => id,
|
||||
Err(_) => {
|
||||
warn!(
|
||||
"Invalid tenant path (garbage in our repo directory?): {0}",
|
||||
tenant_dir_path.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let generation = if let Some(generations) = &tenant_generations {
|
||||
// We have a generation map: treat it as the authority for whether
|
||||
// this tenant is really attached.
|
||||
if let Some(gen) = generations.get(&HexTenantId::new(tenant_id)) {
|
||||
Generation::new(*gen)
|
||||
} else {
|
||||
info!("Detaching tenant {0}, control plane omitted it in re-attach response", tenant_id);
|
||||
if let Err(e) = fs::remove_dir_all(&tenant_dir_path).await {
|
||||
error!(
|
||||
"Failed to remove detached tenant directory '{}': {:?}",
|
||||
tenant_dir_path.display(),
|
||||
e
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
// Legacy mode: no generation information, any tenant present
|
||||
// on local disk may activate
|
||||
info!(
|
||||
"Starting tenant {0} in legacy mode, no generation",
|
||||
tenant_dir_path.display()
|
||||
);
|
||||
Generation::none()
|
||||
};
|
||||
|
||||
match schedule_local_tenant_processing(
|
||||
conf,
|
||||
tenant_id,
|
||||
&tenant_dir_path,
|
||||
generation,
|
||||
resources.clone(),
|
||||
Some(init_order.clone()),
|
||||
&TENANTS,
|
||||
@@ -160,7 +281,9 @@ pub async fn init_tenant_mgr(
|
||||
|
||||
pub(crate) fn schedule_local_tenant_processing(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
tenant_path: &Path,
|
||||
generation: Generation,
|
||||
resources: TenantSharedResources,
|
||||
init_order: Option<InitializationOrder>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
@@ -181,15 +304,6 @@ pub(crate) fn schedule_local_tenant_processing(
|
||||
"Cannot load tenant from empty directory {tenant_path:?}"
|
||||
);
|
||||
|
||||
let tenant_id = tenant_path
|
||||
.file_name()
|
||||
.and_then(OsStr::to_str)
|
||||
.unwrap_or_default()
|
||||
.parse::<TenantId>()
|
||||
.with_context(|| {
|
||||
format!("Could not parse tenant id out of the tenant dir name in path {tenant_path:?}")
|
||||
})?;
|
||||
|
||||
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
|
||||
anyhow::ensure!(
|
||||
!conf.tenant_ignore_mark_file_path(&tenant_id).exists(),
|
||||
@@ -202,9 +316,11 @@ pub(crate) fn schedule_local_tenant_processing(
|
||||
match Tenant::spawn_attach(
|
||||
conf,
|
||||
tenant_id,
|
||||
generation,
|
||||
resources.broker_client,
|
||||
tenants,
|
||||
remote_storage,
|
||||
resources.deletion_queue_client,
|
||||
ctx,
|
||||
) {
|
||||
Ok(tenant) => tenant,
|
||||
@@ -224,7 +340,9 @@ pub(crate) fn schedule_local_tenant_processing(
|
||||
} else {
|
||||
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
|
||||
// Start loading the tenant into memory. It will initially be in Loading state.
|
||||
Tenant::spawn_load(conf, tenant_id, resources, init_order, tenants, ctx)
|
||||
Tenant::spawn_load(
|
||||
conf, tenant_id, generation, resources, init_order, tenants, ctx,
|
||||
)
|
||||
};
|
||||
Ok(tenant)
|
||||
}
|
||||
@@ -347,8 +465,10 @@ pub async fn create_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConfOpt,
|
||||
tenant_id: TenantId,
|
||||
generation: Generation,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
deletion_queue: &DeletionQueue,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Tenant>, TenantMapInsertError> {
|
||||
tenant_map_insert(tenant_id, || {
|
||||
@@ -362,9 +482,11 @@ pub async fn create_tenant(
|
||||
let tenant_resources = TenantSharedResources {
|
||||
broker_client,
|
||||
remote_storage,
|
||||
deletion_queue_client: deletion_queue.new_client(),
|
||||
};
|
||||
let created_tenant =
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, tenant_resources, None, &TENANTS, ctx)?;
|
||||
schedule_local_tenant_processing(conf, tenant_id, &tenant_directory,
|
||||
generation, tenant_resources, None, &TENANTS, ctx)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
@@ -513,6 +635,7 @@ pub async fn load_tenant(
|
||||
tenant_id: TenantId,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
deletion_queue: &DeletionQueue,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), TenantMapInsertError> {
|
||||
tenant_map_insert(tenant_id, || {
|
||||
@@ -526,8 +649,11 @@ pub async fn load_tenant(
|
||||
let resources = TenantSharedResources {
|
||||
broker_client,
|
||||
remote_storage,
|
||||
deletion_queue_client: deletion_queue.new_client(),
|
||||
};
|
||||
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, resources, None, &TENANTS, ctx)
|
||||
// TODO: remove the `/load` API once generation support is complete:
|
||||
// it becomes equivalent to attaching.
|
||||
let new_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_path, Generation::none(), resources, None, &TENANTS, ctx)
|
||||
.with_context(|| {
|
||||
format!("Failed to schedule tenant processing in path {tenant_path:?}")
|
||||
})?;
|
||||
@@ -591,9 +717,11 @@ pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapLis
|
||||
pub async fn attach_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
generation: Generation,
|
||||
tenant_conf: TenantConfOpt,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
deletion_queue: &DeletionQueue,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), TenantMapInsertError> {
|
||||
tenant_map_insert(tenant_id, || {
|
||||
@@ -611,8 +739,9 @@ pub async fn attach_tenant(
|
||||
let resources = TenantSharedResources {
|
||||
broker_client,
|
||||
remote_storage: Some(remote_storage),
|
||||
deletion_queue_client: deletion_queue.new_client(),
|
||||
};
|
||||
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, resources, None, &TENANTS, ctx)?;
|
||||
let attached_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_dir, generation, resources, None, &TENANTS, ctx)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
|
||||
@@ -56,9 +56,11 @@
|
||||
//! # Consistency
|
||||
//!
|
||||
//! To have a consistent remote structure, it's important that uploads and
|
||||
//! deletions are performed in the right order. For example, the index file
|
||||
//! contains a list of layer files, so it must not be uploaded until all the
|
||||
//! layer files that are in its list have been successfully uploaded.
|
||||
//! deletions are performed in the right order. For example:
|
||||
//! - the index file contains a list of layer files, so it must not be uploaded
|
||||
//! until all the layer files that are in its list have been successfully uploaded.
|
||||
//! - objects must be removed from the index before being deleted, and that updated
|
||||
//! index must be written to remote storage before deleting the objects from remote storage.
|
||||
//!
|
||||
//! The contract between client and its user is that the user is responsible of
|
||||
//! scheduling operations in an order that keeps the remote consistent as
|
||||
@@ -70,10 +72,12 @@
|
||||
//! correct order, and the client will parallelize the operations in a way that
|
||||
//! is safe.
|
||||
//!
|
||||
//! The caller should be careful with deletion, though. They should not delete
|
||||
//! local files that have been scheduled for upload but not yet finished uploading.
|
||||
//! Otherwise the upload will fail. To wait for an upload to finish, use
|
||||
//! the 'wait_completion' function (more on that later.)
|
||||
//! The caller should be careful with deletion, though:
|
||||
//! - they should not delete local files that have been scheduled for upload but
|
||||
//! not yet finished uploading. Otherwise the upload will fail. To wait for an
|
||||
//! upload to finish, use the 'wait_completion' function (more on that later.)
|
||||
//! - they should not to remote deletions via DeletionQueue without waiting for
|
||||
//! the latest metadata to upload via RemoteTimelineClient.
|
||||
//!
|
||||
//! All of this relies on the following invariants:
|
||||
//!
|
||||
@@ -200,12 +204,11 @@
|
||||
//! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
|
||||
//! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
|
||||
|
||||
mod delete;
|
||||
mod download;
|
||||
pub mod index;
|
||||
mod upload;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::{bail, Context};
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
// re-export these
|
||||
pub use download::{is_temp_download_file, list_remote_timelines};
|
||||
@@ -216,7 +219,7 @@ use utils::backoff::{
|
||||
};
|
||||
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
@@ -226,6 +229,7 @@ use tracing::{debug, error, info, instrument, warn};
|
||||
use tracing::{info_span, Instrument};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::metrics::{
|
||||
MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
|
||||
RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
|
||||
@@ -234,7 +238,6 @@ use crate::metrics::{
|
||||
use crate::task_mgr::shutdown_token;
|
||||
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::upload_queue::Delete;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
task_mgr,
|
||||
@@ -244,6 +247,7 @@ use crate::{
|
||||
tenant::upload_queue::{
|
||||
UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
|
||||
},
|
||||
tenant::TIMELINES_SEGMENT_NAME,
|
||||
};
|
||||
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -252,6 +256,7 @@ use self::index::IndexPart;
|
||||
|
||||
use super::storage_layer::LayerFileName;
|
||||
use super::upload_queue::SetDeletedFlagProgress;
|
||||
use super::Generation;
|
||||
|
||||
// Occasional network issues and such can cause remote operations to fail, and
|
||||
// that's expected. If a download fails, we log it at info-level, and retry.
|
||||
@@ -315,6 +320,7 @@ pub struct RemoteTimelineClient {
|
||||
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
generation: Generation,
|
||||
|
||||
upload_queue: Mutex<UploadQueue>,
|
||||
|
||||
@@ -335,12 +341,14 @@ impl RemoteTimelineClient {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
generation: Generation,
|
||||
) -> RemoteTimelineClient {
|
||||
RemoteTimelineClient {
|
||||
conf,
|
||||
runtime: BACKGROUND_RUNTIME.handle().to_owned(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
generation,
|
||||
storage_impl: remote_storage,
|
||||
upload_queue: Mutex::new(UploadQueue::Uninitialized),
|
||||
metrics: Arc::new(RemoteTimelineClientMetrics::new(&tenant_id, &timeline_id)),
|
||||
@@ -453,6 +461,7 @@ impl RemoteTimelineClient {
|
||||
&self.storage_impl,
|
||||
&self.tenant_id,
|
||||
&self.timeline_id,
|
||||
self.generation,
|
||||
)
|
||||
.measure_remote_op(
|
||||
self.tenant_id,
|
||||
@@ -631,51 +640,66 @@ impl RemoteTimelineClient {
|
||||
/// deletion won't actually be performed, until any previously scheduled
|
||||
/// upload operations, and the index file upload, have completed
|
||||
/// successfully.
|
||||
pub fn schedule_layer_file_deletion(
|
||||
pub async fn schedule_layer_file_deletion(
|
||||
self: &Arc<Self>,
|
||||
names: &[LayerFileName],
|
||||
deletion_queue_client: &DeletionQueueClient,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
// Synchronous update of upload queues under mutex
|
||||
let with_generations = {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
// Deleting layers doesn't affect the values stored in TimelineMetadata,
|
||||
// so we don't need update it. Just serialize it.
|
||||
let metadata = upload_queue.latest_metadata.clone();
|
||||
// Deleting layers doesn't affect the values stored in TimelineMetadata,
|
||||
// so we don't need update it. Just serialize it.
|
||||
let metadata = upload_queue.latest_metadata.clone();
|
||||
|
||||
// Update the remote index file, removing the to-be-deleted files from the index,
|
||||
// before deleting the actual files.
|
||||
//
|
||||
// Once we start removing files from upload_queue.latest_files, there's
|
||||
// no going back! Otherwise, some of the files would already be removed
|
||||
// from latest_files, but not yet scheduled for deletion. Use a closure
|
||||
// to syntactically forbid ? or bail! calls here.
|
||||
let no_bail_here = || {
|
||||
for name in names {
|
||||
if upload_queue.latest_files.remove(name).is_some() {
|
||||
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
|
||||
}
|
||||
}
|
||||
// Decorate our list of names with each name's generation, dropping
|
||||
// makes that are unexpectedly missing from our metadata.
|
||||
let with_generations: Vec<_> = names
|
||||
.into_iter()
|
||||
.filter_map(|name| {
|
||||
// Remove from latest_files, learning the file's remote generation in the process
|
||||
let meta = upload_queue.latest_files.remove(name);
|
||||
|
||||
if let Some(meta) = meta {
|
||||
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
|
||||
Some((name.clone(), meta.generation))
|
||||
} else {
|
||||
// This is unexpected: latest_files is meant to be kept up to
|
||||
// date. We can't delete the layer if we have forgotten what
|
||||
// generation it was in.
|
||||
warn!("Deleting layer {name} not found in latest_files list");
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
|
||||
self.schedule_index_upload(upload_queue, metadata);
|
||||
}
|
||||
|
||||
// schedule the actual deletions
|
||||
for name in names {
|
||||
let op = UploadOp::Delete(Delete {
|
||||
file_kind: RemoteOpFileKind::Layer,
|
||||
layer_file_name: name.clone(),
|
||||
scheduled_from_timeline_delete: false,
|
||||
});
|
||||
self.calls_unfinished_metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
info!("scheduled layer file deletion {name}");
|
||||
}
|
||||
|
||||
// Launch the tasks immediately, if possible
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
with_generations
|
||||
};
|
||||
no_bail_here();
|
||||
|
||||
// Barrier: we must ensure all prior uploads and index writes have landed in S3
|
||||
// before emitting deletions.
|
||||
if let Err(e) = self.wait_completion().await {
|
||||
// This can only fail if upload queue is shut down: if this happens, we do
|
||||
// not emit any deletions. In this condition (remote client is shut down
|
||||
// during compaction or GC) we may leak some objects.
|
||||
bail!("Cannot complete layer file deletions during shutdown ({e})");
|
||||
}
|
||||
|
||||
// Enqueue deletions
|
||||
deletion_queue_client
|
||||
.push_layers(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
self.generation,
|
||||
with_generations,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -761,10 +785,10 @@ impl RemoteTimelineClient {
|
||||
backoff::retry(
|
||||
|| {
|
||||
upload::upload_index_part(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
&self.tenant_id,
|
||||
&self.timeline_id,
|
||||
self.generation,
|
||||
&index_part_with_deleted_at,
|
||||
)
|
||||
},
|
||||
@@ -801,12 +825,13 @@ impl RemoteTimelineClient {
|
||||
/// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
|
||||
/// The function deletes layer files one by one, then lists the prefix to see if we leaked something
|
||||
/// deletes leaked files if any and proceeds with deletion of index file at the end.
|
||||
pub(crate) async fn delete_all(self: &Arc<Self>) -> anyhow::Result<()> {
|
||||
pub(crate) async fn delete_all(
|
||||
self: &Arc<Self>,
|
||||
deletion_queue: &DeletionQueueClient,
|
||||
) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let (mut receiver, deletions_queued) = {
|
||||
let mut deletions_queued = 0;
|
||||
|
||||
let layers: Vec<_> = {
|
||||
let mut locked = self.upload_queue.lock().unwrap();
|
||||
let stopped = locked.stopped_mut()?;
|
||||
|
||||
@@ -818,40 +843,29 @@ impl RemoteTimelineClient {
|
||||
|
||||
stopped
|
||||
.upload_queue_for_deletion
|
||||
.queued_operations
|
||||
.reserve(stopped.upload_queue_for_deletion.latest_files.len());
|
||||
|
||||
// schedule the actual deletions
|
||||
for name in stopped.upload_queue_for_deletion.latest_files.keys() {
|
||||
let op = UploadOp::Delete(Delete {
|
||||
file_kind: RemoteOpFileKind::Layer,
|
||||
layer_file_name: name.clone(),
|
||||
scheduled_from_timeline_delete: true,
|
||||
});
|
||||
self.calls_unfinished_metric_begin(&op);
|
||||
stopped
|
||||
.upload_queue_for_deletion
|
||||
.queued_operations
|
||||
.push_back(op);
|
||||
|
||||
info!("scheduled layer file deletion {name}");
|
||||
deletions_queued += 1;
|
||||
}
|
||||
|
||||
self.launch_queued_tasks(&mut stopped.upload_queue_for_deletion);
|
||||
|
||||
(
|
||||
self.schedule_barrier(&mut stopped.upload_queue_for_deletion),
|
||||
deletions_queued,
|
||||
)
|
||||
.latest_files
|
||||
.drain()
|
||||
.map(|kv| (kv.0, kv.1.generation))
|
||||
.collect()
|
||||
};
|
||||
|
||||
receiver.changed().await.context("upload queue shut down")?;
|
||||
let layer_deletion_count = layers.len();
|
||||
|
||||
let layer_paths = layers
|
||||
.into_iter()
|
||||
.map(|(layer, generation)| {
|
||||
remote_layer_path(&self.tenant_id, &self.timeline_id, &layer, generation)
|
||||
})
|
||||
.collect();
|
||||
deletion_queue.push_immediate(layer_paths).await?;
|
||||
|
||||
// Do not delete index part yet, it is needed for possible retry. If we remove it first
|
||||
// and retry will arrive to different pageserver there wont be any traces of it on remote storage
|
||||
let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
|
||||
let timeline_storage_path = self.conf.remote_path(&timeline_path)?;
|
||||
let timeline_storage_path = remote_timeline_path(&self.tenant_id, &self.timeline_id);
|
||||
|
||||
// Execute all pending deletions, so that when we prroceed to do a list_prefixes below, we aren't
|
||||
// taking the burden of listing all the layers that we already know we should delete.
|
||||
deletion_queue.flush_immediate().await?;
|
||||
|
||||
let remaining = backoff::retry(
|
||||
|| async {
|
||||
@@ -880,17 +894,9 @@ impl RemoteTimelineClient {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let not_referenced_count = remaining.len();
|
||||
if !remaining.is_empty() {
|
||||
backoff::retry(
|
||||
|| async { self.storage_impl.delete_objects(&remaining).await },
|
||||
|_e| false,
|
||||
FAILED_UPLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES,
|
||||
"delete_objects",
|
||||
backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")),
|
||||
)
|
||||
.await
|
||||
.context("delete_objects")?;
|
||||
deletion_queue.push_immediate(remaining).await?;
|
||||
}
|
||||
|
||||
fail::fail_point!("timeline-delete-before-index-delete", |_| {
|
||||
@@ -901,18 +907,14 @@ impl RemoteTimelineClient {
|
||||
|
||||
let index_file_path = timeline_storage_path.join(Path::new(IndexPart::FILE_NAME));
|
||||
|
||||
debug!("deleting index part");
|
||||
debug!("enqueuing index part deletion");
|
||||
deletion_queue
|
||||
.push_immediate([index_file_path].to_vec())
|
||||
.await?;
|
||||
|
||||
backoff::retry(
|
||||
|| async { self.storage_impl.delete(&index_file_path).await },
|
||||
|_e| false,
|
||||
FAILED_UPLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES,
|
||||
"delete_index",
|
||||
backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled")),
|
||||
)
|
||||
.await
|
||||
.context("delete_index")?;
|
||||
// Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
|
||||
// for a flush to a persistent deletion list so that we may be sure deletion will occur.
|
||||
deletion_queue.flush_immediate().await?;
|
||||
|
||||
fail::fail_point!("timeline-delete-after-index-delete", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
@@ -920,7 +922,7 @@ impl RemoteTimelineClient {
|
||||
))?
|
||||
});
|
||||
|
||||
info!(prefix=%timeline_storage_path, referenced=deletions_queued, not_referenced=%remaining.len(), "done deleting in timeline prefix, including index_part.json");
|
||||
info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -943,10 +945,6 @@ impl RemoteTimelineClient {
|
||||
// have finished.
|
||||
upload_queue.inprogress_tasks.is_empty()
|
||||
}
|
||||
UploadOp::Delete(_) => {
|
||||
// Wait for preceding uploads to finish. Concurrent deletions are OK, though.
|
||||
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
|
||||
}
|
||||
|
||||
UploadOp::Barrier(_) => upload_queue.inprogress_tasks.is_empty(),
|
||||
};
|
||||
@@ -974,9 +972,6 @@ impl RemoteTimelineClient {
|
||||
UploadOp::UploadMetadata(_, _) => {
|
||||
upload_queue.num_inprogress_metadata_uploads += 1;
|
||||
}
|
||||
UploadOp::Delete(_) => {
|
||||
upload_queue.num_inprogress_deletions += 1;
|
||||
}
|
||||
UploadOp::Barrier(sender) => {
|
||||
sender.send_replace(());
|
||||
continue;
|
||||
@@ -1055,15 +1050,17 @@ impl RemoteTimelineClient {
|
||||
|
||||
let upload_result: anyhow::Result<()> = match &task.op {
|
||||
UploadOp::UploadLayer(ref layer_file_name, ref layer_metadata) => {
|
||||
let path = &self
|
||||
let path = self
|
||||
.conf
|
||||
.timeline_path(&self.tenant_id, &self.timeline_id)
|
||||
.join(layer_file_name.file_name());
|
||||
|
||||
upload::upload_timeline_layer(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
path,
|
||||
&path,
|
||||
layer_metadata,
|
||||
self.generation,
|
||||
)
|
||||
.measure_remote_op(
|
||||
self.tenant_id,
|
||||
@@ -1085,10 +1082,10 @@ impl RemoteTimelineClient {
|
||||
};
|
||||
|
||||
let res = upload::upload_index_part(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
&self.tenant_id,
|
||||
&self.timeline_id,
|
||||
self.generation,
|
||||
index_part,
|
||||
)
|
||||
.measure_remote_op(
|
||||
@@ -1108,21 +1105,6 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
res
|
||||
}
|
||||
UploadOp::Delete(delete) => {
|
||||
let path = &self
|
||||
.conf
|
||||
.timeline_path(&self.tenant_id, &self.timeline_id)
|
||||
.join(delete.layer_file_name.file_name());
|
||||
delete::delete_layer(self.conf, &self.storage_impl, path)
|
||||
.measure_remote_op(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
delete.file_kind,
|
||||
RemoteOpKind::Delete,
|
||||
Arc::clone(&self.metrics),
|
||||
)
|
||||
.await
|
||||
}
|
||||
UploadOp::Barrier(_) => {
|
||||
// unreachable. Barrier operations are handled synchronously in
|
||||
// launch_queued_tasks
|
||||
@@ -1182,15 +1164,7 @@ impl RemoteTimelineClient {
|
||||
let mut upload_queue_guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = match upload_queue_guard.deref_mut() {
|
||||
UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
|
||||
UploadQueue::Stopped(stopped) => {
|
||||
// Special care is needed for deletions, if it was an earlier deletion (not scheduled from deletion)
|
||||
// then stop() took care of it so we just return.
|
||||
// For deletions that come from delete_all we still want to maintain metrics, launch following tasks, etc.
|
||||
match &task.op {
|
||||
UploadOp::Delete(delete) if delete.scheduled_from_timeline_delete => Some(&mut stopped.upload_queue_for_deletion),
|
||||
_ => None
|
||||
}
|
||||
},
|
||||
UploadQueue::Stopped(_) => { None }
|
||||
UploadQueue::Initialized(qi) => { Some(qi) }
|
||||
};
|
||||
|
||||
@@ -1212,9 +1186,6 @@ impl RemoteTimelineClient {
|
||||
upload_queue.num_inprogress_metadata_uploads -= 1;
|
||||
upload_queue.last_uploaded_consistent_lsn = lsn; // XXX monotonicity check?
|
||||
}
|
||||
UploadOp::Delete(_) => {
|
||||
upload_queue.num_inprogress_deletions -= 1;
|
||||
}
|
||||
UploadOp::Barrier(_) => unreachable!(),
|
||||
};
|
||||
|
||||
@@ -1246,13 +1217,6 @@ impl RemoteTimelineClient {
|
||||
reason: "metadata uploads are tiny",
|
||||
},
|
||||
),
|
||||
UploadOp::Delete(delete) => (
|
||||
delete.file_kind,
|
||||
RemoteOpKind::Delete,
|
||||
DontTrackSize {
|
||||
reason: "should we track deletes? positive or negative sign?",
|
||||
},
|
||||
),
|
||||
UploadOp::Barrier(_) => {
|
||||
// we do not account these
|
||||
return None;
|
||||
@@ -1312,7 +1276,6 @@ impl RemoteTimelineClient {
|
||||
last_uploaded_consistent_lsn: initialized.last_uploaded_consistent_lsn,
|
||||
num_inprogress_layer_uploads: 0,
|
||||
num_inprogress_metadata_uploads: 0,
|
||||
num_inprogress_deletions: 0,
|
||||
inprogress_tasks: HashMap::default(),
|
||||
queued_operations: VecDeque::default(),
|
||||
};
|
||||
@@ -1333,9 +1296,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
// consistency check
|
||||
assert_eq!(
|
||||
qi.num_inprogress_layer_uploads
|
||||
+ qi.num_inprogress_metadata_uploads
|
||||
+ qi.num_inprogress_deletions,
|
||||
qi.num_inprogress_layer_uploads + qi.num_inprogress_metadata_uploads,
|
||||
qi.inprogress_tasks.len()
|
||||
);
|
||||
|
||||
@@ -1360,14 +1321,84 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remote_timelines_path(tenant_id: &TenantId) -> RemotePath {
|
||||
let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
|
||||
remote_timelines_path(tenant_id).join(&PathBuf::from(timeline_id.to_string()))
|
||||
}
|
||||
|
||||
pub fn remote_layer_path(
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
layer_file_name: &LayerFileName,
|
||||
generation: Generation,
|
||||
) -> RemotePath {
|
||||
// Generation-aware key format
|
||||
let path = format!(
|
||||
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
|
||||
layer_file_name.file_name(),
|
||||
generation.get_suffix()
|
||||
);
|
||||
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_index_path(
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
generation: Generation,
|
||||
) -> RemotePath {
|
||||
RemotePath::from_string(&format!(
|
||||
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
|
||||
IndexPart::FILE_NAME,
|
||||
generation.get_suffix()
|
||||
))
|
||||
.expect("Failed to construct path")
|
||||
}
|
||||
|
||||
/// Files on the remote storage are stored with paths, relative to the workdir.
|
||||
/// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
|
||||
///
|
||||
/// Errors if the path provided does not start from pageserver's workdir.
|
||||
pub fn remote_path(
|
||||
conf: &PageServerConf,
|
||||
local_path: &Path,
|
||||
generation: Option<Generation>,
|
||||
) -> anyhow::Result<RemotePath> {
|
||||
let stripped = local_path
|
||||
.strip_prefix(&conf.workdir)
|
||||
.context("Failed to strip workdir prefix")?;
|
||||
|
||||
let suffixed = if let Some(generation) = generation {
|
||||
format!(
|
||||
"{0}{1}",
|
||||
stripped.to_string_lossy(),
|
||||
generation.get_suffix()
|
||||
)
|
||||
} else {
|
||||
stripped.to_string_lossy().to_string()
|
||||
};
|
||||
|
||||
RemotePath::new(&PathBuf::from(suffixed)).with_context(|| {
|
||||
format!(
|
||||
"Failed to resolve remote part of path {:?} for base {:?}",
|
||||
local_path, conf.workdir
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
deletion_queue::mock::MockDeletionQueue,
|
||||
tenant::{
|
||||
harness::{TenantHarness, TIMELINE_ID},
|
||||
Tenant, Timeline,
|
||||
Generation, Tenant, Timeline,
|
||||
},
|
||||
DEFAULT_PG_VERSION,
|
||||
};
|
||||
@@ -1409,8 +1440,11 @@ mod tests {
|
||||
assert_eq!(avec, bvec);
|
||||
}
|
||||
|
||||
fn assert_remote_files(expected: &[&str], remote_path: &Path) {
|
||||
let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
|
||||
fn assert_remote_files(expected: &[&str], remote_path: &Path, generation: Generation) {
|
||||
let mut expected: Vec<String> = expected
|
||||
.iter()
|
||||
.map(|x| format!("{}{}", x, generation.get_suffix()))
|
||||
.collect();
|
||||
expected.sort();
|
||||
|
||||
let mut found: Vec<String> = Vec::new();
|
||||
@@ -1431,6 +1465,7 @@ mod tests {
|
||||
tenant_ctx: RequestContext,
|
||||
remote_fs_dir: PathBuf,
|
||||
client: Arc<RemoteTimelineClient>,
|
||||
deletion_queue: MockDeletionQueue,
|
||||
}
|
||||
|
||||
impl TestSetup {
|
||||
@@ -1461,6 +1496,8 @@ mod tests {
|
||||
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
|
||||
};
|
||||
|
||||
let generation = Generation::new(0xdeadbeef);
|
||||
|
||||
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
|
||||
|
||||
let client = Arc::new(RemoteTimelineClient {
|
||||
@@ -1468,7 +1505,8 @@ mod tests {
|
||||
runtime: tokio::runtime::Handle::current(),
|
||||
tenant_id: harness.tenant_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
storage_impl: storage,
|
||||
generation,
|
||||
storage_impl: storage.clone(),
|
||||
upload_queue: Mutex::new(UploadQueue::Uninitialized),
|
||||
metrics: Arc::new(RemoteTimelineClientMetrics::new(
|
||||
&harness.tenant_id,
|
||||
@@ -1476,6 +1514,8 @@ mod tests {
|
||||
)),
|
||||
});
|
||||
|
||||
let deletion_queue = MockDeletionQueue::new(Some(storage));
|
||||
|
||||
Ok(Self {
|
||||
harness,
|
||||
tenant,
|
||||
@@ -1483,6 +1523,7 @@ mod tests {
|
||||
tenant_ctx: ctx,
|
||||
remote_fs_dir,
|
||||
client,
|
||||
deletion_queue,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1511,6 +1552,7 @@ mod tests {
|
||||
tenant_ctx: _tenant_ctx,
|
||||
remote_fs_dir,
|
||||
client,
|
||||
deletion_queue,
|
||||
} = TestSetup::new("upload_scheduling").await.unwrap();
|
||||
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
@@ -1526,6 +1568,8 @@ mod tests {
|
||||
.init_upload_queue_for_empty_remote(&metadata)
|
||||
.unwrap();
|
||||
|
||||
let generation = Generation::new(0xdeadbeef);
|
||||
|
||||
// Create a couple of dummy files, schedule upload for them
|
||||
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
|
||||
let layer_file_name_2: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap();
|
||||
@@ -1545,13 +1589,13 @@ mod tests {
|
||||
client
|
||||
.schedule_layer_file_upload(
|
||||
&layer_file_name_1,
|
||||
&LayerFileMetadata::new(content_1.len() as u64),
|
||||
&LayerFileMetadata::new(content_1.len() as u64, generation),
|
||||
)
|
||||
.unwrap();
|
||||
client
|
||||
.schedule_layer_file_upload(
|
||||
&layer_file_name_2,
|
||||
&LayerFileMetadata::new(content_2.len() as u64),
|
||||
&LayerFileMetadata::new(content_2.len() as u64, generation),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -1615,23 +1659,17 @@ mod tests {
|
||||
client
|
||||
.schedule_layer_file_upload(
|
||||
&layer_file_name_3,
|
||||
&LayerFileMetadata::new(content_3.len() as u64),
|
||||
&LayerFileMetadata::new(content_3.len() as u64, generation),
|
||||
)
|
||||
.unwrap();
|
||||
client
|
||||
.schedule_layer_file_deletion(&[layer_file_name_1.clone()])
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let mut guard = client.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut().unwrap();
|
||||
|
||||
// Deletion schedules upload of the index file, and the file deletion itself
|
||||
assert!(upload_queue.queued_operations.len() == 2);
|
||||
assert!(upload_queue.inprogress_tasks.len() == 1);
|
||||
assert!(upload_queue.num_inprogress_layer_uploads == 1);
|
||||
assert!(upload_queue.num_inprogress_deletions == 0);
|
||||
assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
|
||||
assert_eq!(upload_queue.queued_operations.len(), 0);
|
||||
assert_eq!(upload_queue.num_inprogress_layer_uploads, 1);
|
||||
}
|
||||
|
||||
assert_remote_files(
|
||||
&[
|
||||
&layer_file_name_1.file_name(),
|
||||
@@ -1639,10 +1677,50 @@ mod tests {
|
||||
"index_part.json",
|
||||
],
|
||||
&remote_timeline_dir,
|
||||
generation,
|
||||
);
|
||||
|
||||
// Finish them
|
||||
client
|
||||
.schedule_layer_file_deletion(
|
||||
&[layer_file_name_1.clone()],
|
||||
&deletion_queue.new_client(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let mut guard = client.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut().unwrap();
|
||||
|
||||
// Deletion schedules upload of the index file via RemoteTimelineClient, and
|
||||
// deletion of layer files via DeletionQueue. The uploads have all been flushed
|
||||
// because schedule_layer_file_deletion does a wait_completion before pushing
|
||||
// to the deletion_queue
|
||||
assert_eq!(upload_queue.queued_operations.len(), 0);
|
||||
assert_eq!(upload_queue.inprogress_tasks.len(), 0);
|
||||
assert_eq!(upload_queue.num_inprogress_layer_uploads, 0);
|
||||
assert_eq!(
|
||||
upload_queue.latest_files_changes_since_metadata_upload_scheduled,
|
||||
0
|
||||
);
|
||||
}
|
||||
assert_remote_files(
|
||||
&[
|
||||
&layer_file_name_1.file_name(),
|
||||
&layer_file_name_2.file_name(),
|
||||
&layer_file_name_3.file_name(),
|
||||
"index_part.json",
|
||||
],
|
||||
&remote_timeline_dir,
|
||||
generation,
|
||||
);
|
||||
|
||||
// Finish uploads and deletions
|
||||
client.wait_completion().await.unwrap();
|
||||
deletion_queue.pump().await;
|
||||
|
||||
// 1 layer was deleted
|
||||
assert_eq!(deletion_queue.get_executed(), 1);
|
||||
|
||||
assert_remote_files(
|
||||
&[
|
||||
@@ -1651,6 +1729,7 @@ mod tests {
|
||||
"index_part.json",
|
||||
],
|
||||
&remote_timeline_dir,
|
||||
generation,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1703,12 +1782,14 @@ mod tests {
|
||||
|
||||
// Test
|
||||
|
||||
let generation = Generation::new(0xdeadbeef);
|
||||
|
||||
let init = get_bytes_started_stopped();
|
||||
|
||||
client
|
||||
.schedule_layer_file_upload(
|
||||
&layer_file_name_1,
|
||||
&LayerFileMetadata::new(content_1.len() as u64),
|
||||
&LayerFileMetadata::new(content_1.len() as u64, generation),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -1743,4 +1824,23 @@ mod tests {
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn index_part_download() {
|
||||
// let TestSetup {
|
||||
// harness,
|
||||
// tenant: _tenant,
|
||||
// timeline: _timeline,
|
||||
// client,
|
||||
// ..
|
||||
// } = TestSetup::new("index_part_download").await.unwrap();
|
||||
|
||||
// let example_index_part = IndexPart {
|
||||
// version: 3,
|
||||
// timeline_layers: HashSet::new(),
|
||||
// layer_metadata:
|
||||
|
||||
// }
|
||||
|
||||
// }
|
||||
}
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
//! Helper functions to delete files from remote storage with a RemoteStorage
|
||||
use anyhow::Context;
|
||||
use std::path::Path;
|
||||
use tracing::debug;
|
||||
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
|
||||
pub(super) async fn delete_layer<'a>(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &'a GenericRemoteStorage,
|
||||
local_layer_path: &'a Path,
|
||||
) -> anyhow::Result<()> {
|
||||
fail::fail_point!("before-delete-layer", |_| {
|
||||
anyhow::bail!("failpoint before-delete-layer")
|
||||
});
|
||||
debug!("Deleting layer from remote storage: {local_layer_path:?}",);
|
||||
|
||||
let path_to_delete = conf.remote_path(local_layer_path)?;
|
||||
|
||||
// We don't want to print an error if the delete failed if the file has
|
||||
// already been deleted. Thankfully, in this situation S3 already
|
||||
// does not yield an error. While OS-provided local file system APIs do yield
|
||||
// errors, we avoid them in the `LocalFs` wrapper.
|
||||
storage.delete(&path_to_delete).await.with_context(|| {
|
||||
format!("Failed to delete remote layer from storage at {path_to_delete:?}")
|
||||
})
|
||||
}
|
||||
@@ -15,14 +15,16 @@ use tokio_util::sync::CancellationToken;
|
||||
use utils::{backoff, crashsafe};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
|
||||
use crate::tenant::storage_layer::LayerFileName;
|
||||
use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage};
|
||||
use crate::tenant::Generation;
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::index::{IndexPart, LayerFileMetadata};
|
||||
use super::{FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES};
|
||||
use super::{remote_index_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES};
|
||||
|
||||
static MAX_DOWNLOAD_DURATION: Duration = Duration::from_secs(120);
|
||||
|
||||
@@ -41,13 +43,16 @@ pub async fn download_layer_file<'a>(
|
||||
) -> Result<u64, DownloadError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let timeline_path = conf.timeline_path(&tenant_id, &timeline_id);
|
||||
let local_path = conf
|
||||
.timeline_path(&tenant_id, &timeline_id)
|
||||
.join(layer_file_name.file_name());
|
||||
|
||||
let local_path = timeline_path.join(layer_file_name.file_name());
|
||||
|
||||
let remote_path = conf
|
||||
.remote_path(&local_path)
|
||||
.map_err(DownloadError::Other)?;
|
||||
let remote_path = remote_layer_path(
|
||||
&tenant_id,
|
||||
&timeline_id,
|
||||
layer_file_name,
|
||||
layer_metadata.generation,
|
||||
);
|
||||
|
||||
// Perform a rename inspired by durable_rename from file_utils.c.
|
||||
// The sequence:
|
||||
@@ -173,21 +178,19 @@ pub fn is_temp_download_file(path: &Path) -> bool {
|
||||
}
|
||||
|
||||
/// List timelines of given tenant in remote storage
|
||||
pub async fn list_remote_timelines<'a>(
|
||||
storage: &'a GenericRemoteStorage,
|
||||
conf: &'static PageServerConf,
|
||||
pub async fn list_remote_timelines(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<HashSet<TimelineId>> {
|
||||
let tenant_path = conf.timelines_path(&tenant_id);
|
||||
let tenant_storage_path = conf.remote_path(&tenant_path)?;
|
||||
let remote_path = remote_timelines_path(&tenant_id);
|
||||
|
||||
fail::fail_point!("storage-sync-list-remote-timelines", |_| {
|
||||
anyhow::bail!("storage-sync-list-remote-timelines");
|
||||
});
|
||||
|
||||
let timelines = download_retry(
|
||||
|| storage.list_prefixes(Some(&tenant_storage_path)),
|
||||
&format!("list prefixes for {tenant_path:?}"),
|
||||
|| storage.list_prefixes(Some(&remote_path)),
|
||||
&format!("list prefixes for {tenant_id}"),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -221,46 +224,140 @@ pub async fn list_remote_timelines<'a>(
|
||||
Ok(timeline_ids)
|
||||
}
|
||||
|
||||
async fn do_download_index_part(
|
||||
local_path: &Path,
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
index_generation: Generation,
|
||||
) -> Result<IndexPart, DownloadError> {
|
||||
let remote_path = remote_index_path(tenant_id, timeline_id, index_generation);
|
||||
|
||||
let index_part_bytes = download_retry(
|
||||
|| storage.download_all(&remote_path),
|
||||
&format!("download {remote_path:?}"),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
|
||||
.with_context(|| format!("Failed to deserialize index part file into file {local_path:?}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
Ok(index_part)
|
||||
}
|
||||
|
||||
pub(super) async fn download_index_part(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
my_generation: Generation,
|
||||
) -> Result<IndexPart, DownloadError> {
|
||||
let index_part_path = conf
|
||||
let local_path = conf
|
||||
.metadata_path(tenant_id, timeline_id)
|
||||
.with_file_name(IndexPart::FILE_NAME);
|
||||
let part_storage_path = conf
|
||||
.remote_path(&index_part_path)
|
||||
.map_err(DownloadError::BadInput)?;
|
||||
|
||||
let index_part_bytes = download_retry(
|
||||
|| async {
|
||||
let mut index_part_download = storage.download(&part_storage_path).await?;
|
||||
if my_generation.is_none() {
|
||||
// Operating without generations: just fetch the generation-less path
|
||||
return do_download_index_part(&local_path, storage, tenant_id, timeline_id, my_generation)
|
||||
.await;
|
||||
}
|
||||
|
||||
let mut index_part_bytes = Vec::new();
|
||||
tokio::io::copy(
|
||||
&mut index_part_download.download_stream,
|
||||
&mut index_part_bytes,
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to download an index part into file {index_part_path:?}")
|
||||
})
|
||||
.map_err(DownloadError::Other)?;
|
||||
Ok(index_part_bytes)
|
||||
},
|
||||
&format!("download {part_storage_path:?}"),
|
||||
let previous_gen = my_generation.previous();
|
||||
let r_previous =
|
||||
do_download_index_part(&local_path, storage, tenant_id, timeline_id, previous_gen).await;
|
||||
|
||||
match r_previous {
|
||||
Ok(index_part) => {
|
||||
tracing::debug!("Found index_part from previous generation {previous_gen}");
|
||||
return Ok(index_part);
|
||||
}
|
||||
Err(e) => {
|
||||
if matches!(e, DownloadError::NotFound) {
|
||||
tracing::debug!("No index_part found from previous generation {previous_gen}, falling back to listing");
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/// Given the key of an index, parse out the generation part of the name
|
||||
fn parse_generation(path: RemotePath) -> Option<Generation> {
|
||||
let path = path.take();
|
||||
let file_name = match path.file_name() {
|
||||
Some(f) => f,
|
||||
None => {
|
||||
// Unexpected: we should be seeing index_part.json paths only
|
||||
tracing::warn!("Malformed index key {0}", path.display());
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let file_name_str = match file_name.to_str() {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
tracing::warn!("Malformed index key {0}", path.display());
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
match file_name_str.split_once("-") {
|
||||
Some((_, gen_suffix)) => u32::from_str_radix(gen_suffix, 16)
|
||||
.map(|g| Generation::new(g))
|
||||
.ok(),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: we did not find an index_part.json from the previous generation, so
|
||||
// we will list all the index_part objects and pick the most recent.
|
||||
let index_prefix = remote_index_path(tenant_id, timeline_id, Generation::none());
|
||||
let indices = backoff::retry(
|
||||
|| async { storage.list_files(Some(&index_prefix)).await },
|
||||
|_| false,
|
||||
FAILED_DOWNLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES,
|
||||
"listing index_part files",
|
||||
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
|
||||
backoff::Cancel::new(CancellationToken::new(), || -> anyhow::Error {
|
||||
unreachable!()
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| DownloadError::Other(e))?;
|
||||
|
||||
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
|
||||
.with_context(|| {
|
||||
format!("Failed to deserialize index part file into file {index_part_path:?}")
|
||||
})
|
||||
.map_err(DownloadError::Other)?;
|
||||
let mut generations: Vec<_> = indices
|
||||
.into_iter()
|
||||
.filter_map(|k| parse_generation(k))
|
||||
.filter(|g| g <= &my_generation)
|
||||
.collect();
|
||||
|
||||
Ok(index_part)
|
||||
generations.sort();
|
||||
match generations.last() {
|
||||
Some(g) => {
|
||||
tracing::debug!("Found index_part in generation {g} (my generation {my_generation})");
|
||||
do_download_index_part(&local_path, storage, tenant_id, timeline_id, *g).await
|
||||
}
|
||||
None => {
|
||||
// This is not an error: the timeline may be newly created, or we may be
|
||||
// upgrading and have no historical index_part with a generation suffix.
|
||||
// Fall back to trying to load the un-suffixed index_part.json.
|
||||
tracing::info!(
|
||||
"No index_part.json-* found when loading {}/{} in generation {}",
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
my_generation
|
||||
);
|
||||
return do_download_index_part(
|
||||
&local_path,
|
||||
storage,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
Generation::none(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to handle retries for a download operation.
|
||||
|
||||
@@ -12,6 +12,7 @@ use utils::bin_ser::SerializeError;
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
use crate::tenant::storage_layer::LayerFileName;
|
||||
use crate::tenant::upload_queue::UploadQueueInitialized;
|
||||
use crate::tenant::Generation;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -20,22 +21,28 @@ use utils::lsn::Lsn;
|
||||
/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
|
||||
/// might have less or more metadata depending if upgrading or rolling back an upgrade.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
|
||||
#[cfg_attr(test, derive(Default))]
|
||||
//#[cfg_attr(test, derive(Default))]
|
||||
pub struct LayerFileMetadata {
|
||||
file_size: u64,
|
||||
|
||||
pub(crate) generation: Generation,
|
||||
}
|
||||
|
||||
impl From<&'_ IndexLayerMetadata> for LayerFileMetadata {
|
||||
fn from(other: &IndexLayerMetadata) -> Self {
|
||||
LayerFileMetadata {
|
||||
file_size: other.file_size,
|
||||
generation: other.generation,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerFileMetadata {
|
||||
pub fn new(file_size: u64) -> Self {
|
||||
LayerFileMetadata { file_size }
|
||||
pub fn new(file_size: u64, generation: Generation) -> Self {
|
||||
LayerFileMetadata {
|
||||
file_size,
|
||||
generation,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn file_size(&self) -> u64 {
|
||||
@@ -135,15 +142,20 @@ impl TryFrom<&UploadQueueInitialized> for IndexPart {
|
||||
}
|
||||
|
||||
/// Serialized form of [`LayerFileMetadata`].
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct IndexLayerMetadata {
|
||||
pub(super) file_size: u64,
|
||||
|
||||
#[serde(default = "Generation::none")]
|
||||
#[serde(skip_serializing_if = "Generation::is_none")]
|
||||
pub(super) generation: Generation,
|
||||
}
|
||||
|
||||
impl From<&'_ LayerFileMetadata> for IndexLayerMetadata {
|
||||
fn from(other: &'_ LayerFileMetadata) -> Self {
|
||||
IndexLayerMetadata {
|
||||
file_size: other.file_size,
|
||||
generation: other.generation,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -172,11 +184,13 @@ mod tests {
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none()
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -209,11 +223,13 @@ mod tests {
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none()
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
@@ -247,11 +263,13 @@ mod tests {
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none()
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
|
||||
@@ -5,7 +5,11 @@ use fail::fail_point;
|
||||
use std::{io::ErrorKind, path::Path};
|
||||
use tokio::fs;
|
||||
|
||||
use crate::{config::PageServerConf, tenant::remote_timeline_client::index::IndexPart};
|
||||
use super::Generation;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
tenant::remote_timeline_client::{index::IndexPart, remote_index_path, remote_path},
|
||||
};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
@@ -15,10 +19,10 @@ use tracing::info;
|
||||
|
||||
/// Serializes and uploads the given index part data to the remote storage.
|
||||
pub(super) async fn upload_index_part<'a>(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &'a GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
generation: Generation,
|
||||
index_part: &'a IndexPart,
|
||||
) -> anyhow::Result<()> {
|
||||
tracing::trace!("uploading new index part");
|
||||
@@ -32,13 +36,9 @@ pub(super) async fn upload_index_part<'a>(
|
||||
let index_part_size = index_part_bytes.len();
|
||||
let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));
|
||||
|
||||
let index_part_path = conf
|
||||
.metadata_path(tenant_id, timeline_id)
|
||||
.with_file_name(IndexPart::FILE_NAME);
|
||||
let storage_path = conf.remote_path(&index_part_path)?;
|
||||
|
||||
let remote_path = remote_index_path(tenant_id, timeline_id, generation);
|
||||
storage
|
||||
.upload_storage_object(Box::new(index_part_bytes), index_part_size, &storage_path)
|
||||
.upload_storage_object(Box::new(index_part_bytes), index_part_size, &remote_path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to upload index part for '{tenant_id} / {timeline_id}'"))
|
||||
}
|
||||
@@ -52,12 +52,13 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
storage: &'a GenericRemoteStorage,
|
||||
source_path: &'a Path,
|
||||
known_metadata: &'a LayerFileMetadata,
|
||||
generation: Generation,
|
||||
) -> anyhow::Result<()> {
|
||||
fail_point!("before-upload-layer", |_| {
|
||||
bail!("failpoint before-upload-layer")
|
||||
});
|
||||
let storage_path = conf.remote_path(source_path)?;
|
||||
|
||||
let storage_path = remote_path(conf, source_path, Some(generation))?;
|
||||
let source_file_res = fs::File::open(&source_path).await;
|
||||
let source_file = match source_file_res {
|
||||
Ok(source_file) => source_file,
|
||||
|
||||
@@ -38,6 +38,7 @@ use std::time::{Duration, Instant, SystemTime};
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
|
||||
use crate::tenant::storage_layer::{
|
||||
@@ -67,6 +68,7 @@ use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::to_pg_timestamp;
|
||||
use utils::{
|
||||
completion,
|
||||
generation::Generation,
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::{AtomicLsn, Lsn, RecordLsn},
|
||||
seqwait::SeqWait,
|
||||
@@ -141,6 +143,7 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
|
||||
/// The outward-facing resources required to build a Timeline
|
||||
pub struct TimelineResources {
|
||||
pub remote_client: Option<RemoteTimelineClient>,
|
||||
pub deletion_queue_client: Option<DeletionQueueClient>,
|
||||
}
|
||||
|
||||
pub struct Timeline {
|
||||
@@ -152,6 +155,9 @@ pub struct Timeline {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
|
||||
// The generation of the tenant that instantiated us: this is used for safety when writing remote objects
|
||||
generation: Generation,
|
||||
|
||||
pub pg_version: u32,
|
||||
|
||||
/// The tuple has two elements.
|
||||
@@ -195,6 +201,9 @@ pub struct Timeline {
|
||||
/// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
|
||||
pub remote_client: Option<Arc<RemoteTimelineClient>>,
|
||||
|
||||
/// Deletion queue: a global queue, separate to the remote storage queue's
|
||||
deletion_queue_client: Option<Arc<DeletionQueueClient>>,
|
||||
|
||||
// What page versions do we hold in the repository? If we get a
|
||||
// request > last_record_lsn, we need to wait until we receive all
|
||||
// the WAL up to the request. The SeqWait provides functions for
|
||||
@@ -1199,7 +1208,7 @@ impl Timeline {
|
||||
Ok(delta) => Some(delta),
|
||||
};
|
||||
|
||||
let layer_metadata = LayerFileMetadata::new(layer_file_size);
|
||||
let layer_metadata = LayerFileMetadata::new(layer_file_size, self.generation);
|
||||
|
||||
let new_remote_layer = Arc::new(match local_layer.filename() {
|
||||
LayerFileName::Image(image_name) => RemoteLayer::new_img(
|
||||
@@ -1262,6 +1271,18 @@ impl Timeline {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_all_remote(&self) -> anyhow::Result<()> {
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
if let Some(deletion_queue_client) = &self.deletion_queue_client {
|
||||
remote_client.delete_all(deletion_queue_client).await
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -1377,6 +1398,7 @@ impl Timeline {
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
generation: Generation,
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
resources: TimelineResources,
|
||||
pg_version: u32,
|
||||
@@ -1406,6 +1428,7 @@ impl Timeline {
|
||||
myself: myself.clone(),
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
generation,
|
||||
pg_version,
|
||||
layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())),
|
||||
wanted_image_layers: Mutex::new(None),
|
||||
@@ -1414,6 +1437,7 @@ impl Timeline {
|
||||
walreceiver: Mutex::new(None),
|
||||
|
||||
remote_client: resources.remote_client.map(Arc::new),
|
||||
deletion_queue_client: resources.deletion_queue_client.map(Arc::new),
|
||||
|
||||
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
|
||||
last_record_lsn: SeqWait::new(RecordLsn {
|
||||
@@ -1615,6 +1639,9 @@ impl Timeline {
|
||||
let (conf, tenant_id, timeline_id) = (self.conf, self.tenant_id, self.timeline_id);
|
||||
let span = tracing::Span::current();
|
||||
|
||||
// Copy to move into the task we're about to spawn
|
||||
let generation = self.generation;
|
||||
|
||||
let (loaded_layers, to_sync, total_physical_size) = tokio::task::spawn_blocking({
|
||||
move || {
|
||||
let _g = span.entered();
|
||||
@@ -1656,8 +1683,12 @@ impl Timeline {
|
||||
);
|
||||
}
|
||||
|
||||
let decided =
|
||||
init::reconcile(discovered_layers, index_part.as_ref(), disk_consistent_lsn);
|
||||
let decided = init::reconcile(
|
||||
discovered_layers,
|
||||
index_part.as_ref(),
|
||||
disk_consistent_lsn,
|
||||
generation,
|
||||
);
|
||||
|
||||
let mut loaded_layers = Vec::new();
|
||||
let mut needs_upload = Vec::new();
|
||||
@@ -1750,11 +1781,15 @@ impl Timeline {
|
||||
guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
|
||||
|
||||
if let Some(rtc) = self.remote_client.as_ref() {
|
||||
// Deletion queue client is always Some if remote_client is Some
|
||||
let deletion_queue_client = self.deletion_queue_client.as_ref().unwrap();
|
||||
|
||||
let (needs_upload, needs_cleanup) = to_sync;
|
||||
for (layer, m) in needs_upload {
|
||||
rtc.schedule_layer_file_upload(&layer.layer_desc().filename(), &m)?;
|
||||
}
|
||||
rtc.schedule_layer_file_deletion(&needs_cleanup)?;
|
||||
rtc.schedule_layer_file_deletion(&needs_cleanup, deletion_queue_client)
|
||||
.await?;
|
||||
rtc.schedule_index_upload_for_file_changes()?;
|
||||
// Tenant::create_timeline will wait for these uploads to happen before returning, or
|
||||
// on retry.
|
||||
@@ -2669,7 +2704,7 @@ impl Timeline {
|
||||
(
|
||||
HashMap::from([(
|
||||
layer.filename(),
|
||||
LayerFileMetadata::new(layer.layer_desc().file_size),
|
||||
LayerFileMetadata::new(layer.layer_desc().file_size, self.generation),
|
||||
)]),
|
||||
Some(layer),
|
||||
)
|
||||
@@ -3065,7 +3100,10 @@ impl Timeline {
|
||||
.metadata()
|
||||
.with_context(|| format!("reading metadata of layer file {}", path.file_name()))?;
|
||||
|
||||
layer_paths_to_upload.insert(path, LayerFileMetadata::new(metadata.len()));
|
||||
layer_paths_to_upload.insert(
|
||||
path,
|
||||
LayerFileMetadata::new(metadata.len(), self.generation),
|
||||
);
|
||||
|
||||
self.metrics
|
||||
.resident_physical_size_gauge
|
||||
@@ -3740,7 +3778,7 @@ impl Timeline {
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_layer_file_upload(
|
||||
&l.filename(),
|
||||
&LayerFileMetadata::new(metadata.len()),
|
||||
&LayerFileMetadata::new(metadata.len(), self.generation),
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -3749,7 +3787,10 @@ impl Timeline {
|
||||
.resident_physical_size_gauge
|
||||
.add(metadata.len());
|
||||
|
||||
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
|
||||
new_layer_paths.insert(
|
||||
new_delta_path,
|
||||
LayerFileMetadata::new(metadata.len(), self.generation),
|
||||
);
|
||||
l.access_stats().record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
@@ -3789,7 +3830,13 @@ impl Timeline {
|
||||
|
||||
// Also schedule the deletions in remote storage
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
|
||||
let deletion_queue = self
|
||||
.deletion_queue_client
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("Remote storage enabled without deletion queue"))?;
|
||||
remote_client
|
||||
.schedule_layer_file_deletion(&layer_names_to_delete, deletion_queue)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -4123,7 +4170,15 @@ impl Timeline {
|
||||
}
|
||||
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
|
||||
// Remote metadata upload was scheduled in `update_metadata_file`: wait
|
||||
// for completion before scheduling any deletions.
|
||||
remote_client.wait_completion().await?;
|
||||
let deletion_queue = self.deletion_queue_client.as_ref().ok_or_else(|| {
|
||||
anyhow::anyhow!("Remote storage enabled without deletion queue")
|
||||
})?;
|
||||
remote_client
|
||||
.schedule_layer_file_deletion(&layer_names_to_delete, deletion_queue)
|
||||
.await?;
|
||||
}
|
||||
|
||||
apply.flush();
|
||||
@@ -4713,6 +4768,7 @@ mod tests {
|
||||
|
||||
use utils::{id::TimelineId, lsn::Lsn};
|
||||
|
||||
use crate::deletion_queue::mock::MockDeletionQueue;
|
||||
use crate::tenant::{harness::TenantHarness, storage_layer::PersistentLayer};
|
||||
|
||||
use super::{EvictionError, Timeline};
|
||||
@@ -4735,9 +4791,17 @@ mod tests {
|
||||
};
|
||||
GenericRemoteStorage::from_config(&config).unwrap()
|
||||
};
|
||||
let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()), harness.conf);
|
||||
|
||||
let ctx = any_context();
|
||||
let tenant = harness.try_load(&ctx, Some(remote_storage)).await.unwrap();
|
||||
let tenant = harness
|
||||
.try_load(
|
||||
&ctx,
|
||||
Some(remote_storage),
|
||||
Some(deletion_queue.new_client()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
|
||||
.await
|
||||
@@ -4800,9 +4864,17 @@ mod tests {
|
||||
};
|
||||
GenericRemoteStorage::from_config(&config).unwrap()
|
||||
};
|
||||
let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()), harness.conf);
|
||||
|
||||
let ctx = any_context();
|
||||
let tenant = harness.try_load(&ctx, Some(remote_storage)).await.unwrap();
|
||||
let tenant = harness
|
||||
.try_load(
|
||||
&ctx,
|
||||
Some(remote_storage),
|
||||
Some(deletion_queue.new_client()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
|
||||
.await
|
||||
|
||||
@@ -14,6 +14,7 @@ use utils::{
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
deletion_queue::DeletionQueueClient,
|
||||
task_mgr::{self, TaskKind},
|
||||
tenant::{
|
||||
metadata::TimelineMetadata,
|
||||
@@ -238,15 +239,6 @@ async fn delete_local_layer_files(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes remote layers and an index file after them.
|
||||
async fn delete_remote_layers_and_index(timeline: &Timeline) -> anyhow::Result<()> {
|
||||
if let Some(remote_client) = &timeline.remote_client {
|
||||
remote_client.delete_all().await.context("delete_all")?
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// This function removs remaining traces of a timeline on disk.
|
||||
// Namely: metadata file, timeline directory, delete mark.
|
||||
// Note: io::ErrorKind::NotFound are ignored for metadata and timeline dir.
|
||||
@@ -407,6 +399,7 @@ impl DeleteTimelineFlow {
|
||||
timeline_id: TimelineId,
|
||||
local_metadata: &TimelineMetadata,
|
||||
remote_client: Option<RemoteTimelineClient>,
|
||||
deletion_queue_client: Option<DeletionQueueClient>,
|
||||
init_order: Option<&InitializationOrder>,
|
||||
) -> anyhow::Result<()> {
|
||||
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
|
||||
@@ -416,7 +409,10 @@ impl DeleteTimelineFlow {
|
||||
timeline_id,
|
||||
local_metadata,
|
||||
None, // Ancestor is not needed for deletion.
|
||||
TimelineResources { remote_client },
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
deletion_queue_client,
|
||||
},
|
||||
init_order,
|
||||
// Important. We dont pass ancestor above because it can be missing.
|
||||
// Thus we need to skip the validation here.
|
||||
@@ -559,7 +555,7 @@ impl DeleteTimelineFlow {
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
delete_local_layer_files(conf, tenant.tenant_id, timeline).await?;
|
||||
|
||||
delete_remote_layers_and_index(timeline).await?;
|
||||
timeline.delete_all_remote().await?;
|
||||
|
||||
pausable_failpoint!("in_progress_delete");
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ use crate::{
|
||||
index::{IndexPart, LayerFileMetadata},
|
||||
},
|
||||
storage_layer::LayerFileName,
|
||||
Generation,
|
||||
},
|
||||
METADATA_FILE_NAME,
|
||||
};
|
||||
@@ -104,6 +105,7 @@ pub(super) fn reconcile(
|
||||
discovered: Vec<(LayerFileName, u64)>,
|
||||
index_part: Option<&IndexPart>,
|
||||
disk_consistent_lsn: Lsn,
|
||||
generation: Generation,
|
||||
) -> Vec<(LayerFileName, Result<Decision, FutureLayer>)> {
|
||||
use Decision::*;
|
||||
|
||||
@@ -112,7 +114,15 @@ pub(super) fn reconcile(
|
||||
|
||||
let mut discovered = discovered
|
||||
.into_iter()
|
||||
.map(|(name, file_size)| (name, (Some(LayerFileMetadata::new(file_size)), None)))
|
||||
.map(|(name, file_size)| {
|
||||
(
|
||||
name,
|
||||
// The generation here will be corrected to match IndexPart in the merge below, unless
|
||||
// it is not in IndexPart, in which case using our current generation makes sense
|
||||
// because it will be uploaded in this generation.
|
||||
(Some(LayerFileMetadata::new(file_size, generation)), None),
|
||||
)
|
||||
})
|
||||
.collect::<Collected>();
|
||||
|
||||
// merge any index_part information, when available
|
||||
@@ -137,7 +147,11 @@ pub(super) fn reconcile(
|
||||
Err(FutureLayer { local })
|
||||
} else {
|
||||
Ok(match (local, remote) {
|
||||
(Some(local), Some(remote)) if local != remote => UseRemote { local, remote },
|
||||
(Some(local), Some(remote)) if local != remote => {
|
||||
assert_eq!(local.generation, remote.generation);
|
||||
|
||||
UseRemote { local, remote }
|
||||
}
|
||||
(Some(x), Some(_)) => UseLocal(x),
|
||||
(None, Some(x)) => Evicted(x),
|
||||
(Some(x), None) => NeedsUpload(x),
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use crate::metrics::RemoteOpFileKind;
|
||||
|
||||
use super::storage_layer::LayerFileName;
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
use crate::tenant::remote_timeline_client::index::IndexPart;
|
||||
@@ -62,7 +60,6 @@ pub(crate) struct UploadQueueInitialized {
|
||||
// Breakdown of different kinds of tasks currently in-progress
|
||||
pub(crate) num_inprogress_layer_uploads: usize,
|
||||
pub(crate) num_inprogress_metadata_uploads: usize,
|
||||
pub(crate) num_inprogress_deletions: usize,
|
||||
|
||||
/// Tasks that are currently in-progress. In-progress means that a tokio Task
|
||||
/// has been launched for it. An in-progress task can be busy uploading, but it can
|
||||
@@ -120,7 +117,6 @@ impl UploadQueue {
|
||||
task_counter: 0,
|
||||
num_inprogress_layer_uploads: 0,
|
||||
num_inprogress_metadata_uploads: 0,
|
||||
num_inprogress_deletions: 0,
|
||||
inprogress_tasks: HashMap::new(),
|
||||
queued_operations: VecDeque::new(),
|
||||
};
|
||||
@@ -162,7 +158,6 @@ impl UploadQueue {
|
||||
task_counter: 0,
|
||||
num_inprogress_layer_uploads: 0,
|
||||
num_inprogress_metadata_uploads: 0,
|
||||
num_inprogress_deletions: 0,
|
||||
inprogress_tasks: HashMap::new(),
|
||||
queued_operations: VecDeque::new(),
|
||||
};
|
||||
@@ -200,13 +195,6 @@ pub(crate) struct UploadTask {
|
||||
pub(crate) op: UploadOp,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Delete {
|
||||
pub(crate) file_kind: RemoteOpFileKind,
|
||||
pub(crate) layer_file_name: LayerFileName,
|
||||
pub(crate) scheduled_from_timeline_delete: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum UploadOp {
|
||||
/// Upload a layer file
|
||||
@@ -215,9 +203,6 @@ pub(crate) enum UploadOp {
|
||||
/// Upload the metadata file
|
||||
UploadMetadata(IndexPart, Lsn),
|
||||
|
||||
/// Delete a layer file
|
||||
Delete(Delete),
|
||||
|
||||
/// Barrier. When the barrier operation is reached,
|
||||
Barrier(tokio::sync::watch::Sender<()>),
|
||||
}
|
||||
@@ -233,13 +218,9 @@ impl std::fmt::Display for UploadOp {
|
||||
metadata.file_size()
|
||||
)
|
||||
}
|
||||
UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn),
|
||||
UploadOp::Delete(delete) => write!(
|
||||
f,
|
||||
"Delete(path: {}, scheduled_from_timeline_delete: {})",
|
||||
delete.layer_file_name.file_name(),
|
||||
delete.scheduled_from_timeline_delete
|
||||
),
|
||||
UploadOp::UploadMetadata(_, lsn) => {
|
||||
write!(f, "UploadMetadata(lsn: {})", lsn)
|
||||
}
|
||||
UploadOp::Barrier(_) => write!(f, "Barrier"),
|
||||
}
|
||||
}
|
||||
|
||||
1
pageserver/src/test.log
Normal file
1
pageserver/src/test.log
Normal file
@@ -0,0 +1 @@
|
||||
-bash: scripts/pytest: No such file or directory
|
||||
@@ -428,6 +428,7 @@ class NeonEnvBuilder:
|
||||
preserve_database_files: bool = False,
|
||||
initial_tenant: Optional[TenantId] = None,
|
||||
initial_timeline: Optional[TimelineId] = None,
|
||||
enable_generations: bool = False,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.rust_log_override = rust_log_override
|
||||
@@ -454,6 +455,7 @@ class NeonEnvBuilder:
|
||||
self.preserve_database_files = preserve_database_files
|
||||
self.initial_tenant = initial_tenant or TenantId.generate()
|
||||
self.initial_timeline = initial_timeline or TimelineId.generate()
|
||||
self.enable_generations = False
|
||||
|
||||
def init_configs(self) -> NeonEnv:
|
||||
# Cannot create more than one environment from one builder
|
||||
@@ -713,6 +715,9 @@ class NeonEnvBuilder:
|
||||
sk.stop(immediate=True)
|
||||
self.env.pageserver.stop(immediate=True)
|
||||
|
||||
if self.env.attachment_service is not None:
|
||||
self.env.attachment_service.stop(immediate=True)
|
||||
|
||||
cleanup_error = None
|
||||
try:
|
||||
self.cleanup_remote_storage()
|
||||
@@ -766,6 +771,8 @@ class NeonEnv:
|
||||
the tenant id
|
||||
"""
|
||||
|
||||
PAGESERVER_ID = 1
|
||||
|
||||
def __init__(self, config: NeonEnvBuilder):
|
||||
self.repo_dir = config.repo_dir
|
||||
self.rust_log_override = config.rust_log_override
|
||||
@@ -789,6 +796,14 @@ class NeonEnv:
|
||||
self.initial_tenant = config.initial_tenant
|
||||
self.initial_timeline = config.initial_timeline
|
||||
|
||||
if config.enable_generations:
|
||||
attachment_service_port = self.port_distributor.get_port()
|
||||
self.control_plane_api: Optional[str] = f"http://127.0.0.1:{attachment_service_port}"
|
||||
self.attachment_service: Optional[NeonAttachmentService] = NeonAttachmentService(self)
|
||||
else:
|
||||
self.control_plane_api = None
|
||||
self.attachment_service = None
|
||||
|
||||
# Create a config file corresponding to the options
|
||||
toml = textwrap.dedent(
|
||||
f"""
|
||||
@@ -814,7 +829,7 @@ class NeonEnv:
|
||||
toml += textwrap.dedent(
|
||||
f"""
|
||||
[pageserver]
|
||||
id=1
|
||||
id={self.PAGESERVER_ID}
|
||||
listen_pg_addr = 'localhost:{pageserver_port.pg}'
|
||||
listen_http_addr = 'localhost:{pageserver_port.http}'
|
||||
pg_auth_type = '{pg_auth_type}'
|
||||
@@ -822,6 +837,13 @@ class NeonEnv:
|
||||
"""
|
||||
)
|
||||
|
||||
if self.control_plane_api is not None:
|
||||
toml += textwrap.dedent(
|
||||
f"""
|
||||
control_plane_api = '{self.control_plane_api}'
|
||||
"""
|
||||
)
|
||||
|
||||
# Create a corresponding NeonPageserver object
|
||||
self.pageserver = NeonPageserver(
|
||||
self, port=pageserver_port, config_override=config.pageserver_config_override
|
||||
@@ -868,6 +890,9 @@ class NeonEnv:
|
||||
def start(self):
|
||||
# Start up broker, pageserver and all safekeepers
|
||||
self.broker.try_start()
|
||||
|
||||
if self.attachment_service is not None:
|
||||
self.attachment_service.start()
|
||||
self.pageserver.start()
|
||||
|
||||
for safekeeper in self.safekeepers:
|
||||
@@ -1289,6 +1314,16 @@ class NeonCli(AbstractNeonCli):
|
||||
res.check_returncode()
|
||||
return res
|
||||
|
||||
def attachment_service_start(self):
|
||||
cmd = ["attachment_service", "start"]
|
||||
return self.raw_cli(cmd)
|
||||
|
||||
def attachment_service_stop(self, immediate: bool):
|
||||
cmd = ["attachment_service", "stop"]
|
||||
if immediate:
|
||||
cmd.extend(["-m", "immediate"])
|
||||
return self.raw_cli(cmd)
|
||||
|
||||
def pageserver_start(
|
||||
self,
|
||||
overrides: Tuple[str, ...] = (),
|
||||
@@ -1470,6 +1505,33 @@ class ComputeCtl(AbstractNeonCli):
|
||||
COMMAND = "compute_ctl"
|
||||
|
||||
|
||||
class NeonAttachmentService:
|
||||
def __init__(self, env: NeonEnv):
|
||||
self.env = env
|
||||
|
||||
def start(self):
|
||||
self.env.neon_cli.attachment_service_start()
|
||||
self.running = True
|
||||
return self
|
||||
|
||||
def stop(self, immediate: bool = False) -> "NeonAttachmentService":
|
||||
if self.running:
|
||||
self.env.neon_cli.attachment_service_stop(immediate)
|
||||
self.running = False
|
||||
return self
|
||||
|
||||
def __enter__(self) -> "NeonAttachmentService":
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc: Optional[BaseException],
|
||||
tb: Optional[TracebackType],
|
||||
):
|
||||
self.stop(immediate=True)
|
||||
|
||||
|
||||
class NeonPageserver(PgProtocol):
|
||||
"""
|
||||
An object representing a running pageserver.
|
||||
@@ -1633,6 +1695,26 @@ class NeonPageserver(PgProtocol):
|
||||
|
||||
return None
|
||||
|
||||
def tenant_attach(
|
||||
self, tenant_id: TenantId, config: None | Dict[str, Any] = None, config_null: bool = False
|
||||
):
|
||||
"""
|
||||
Tenant attachment passes through here to acquire a generation number before proceeding
|
||||
to call into the pageserver HTTP client.
|
||||
"""
|
||||
if self.env.attachment_service is not None:
|
||||
response = requests.post(
|
||||
f"{self.env.control_plane_api}/attach_hook",
|
||||
json={"tenant_id": str(tenant_id), "pageserver_id": self.env.PAGESERVER_ID},
|
||||
)
|
||||
response.raise_for_status()
|
||||
generation = response.json()["gen"]
|
||||
else:
|
||||
generation = None
|
||||
|
||||
client = self.env.pageserver.http_client()
|
||||
return client.tenant_attach(tenant_id, config, config_null, generation=generation)
|
||||
|
||||
|
||||
def append_pageserver_param_overrides(
|
||||
params_to_update: List[str],
|
||||
|
||||
@@ -186,18 +186,25 @@ class PageserverHttpClient(requests.Session):
|
||||
return TenantId(new_tenant_id)
|
||||
|
||||
def tenant_attach(
|
||||
self, tenant_id: TenantId, config: None | Dict[str, Any] = None, config_null: bool = False
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
config: None | Dict[str, Any] = None,
|
||||
config_null: bool = False,
|
||||
generation: Optional[int] = None,
|
||||
):
|
||||
if config_null:
|
||||
assert config is None
|
||||
body = "null"
|
||||
body: Any = None
|
||||
else:
|
||||
# null-config is prohibited by the API
|
||||
config = config or {}
|
||||
body = json.dumps({"config": config})
|
||||
body = {"config": config}
|
||||
if generation is not None:
|
||||
body.update({"generation": generation})
|
||||
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach",
|
||||
data=body,
|
||||
data=json.dumps(body),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
self.verbose_error(res)
|
||||
@@ -613,3 +620,8 @@ class PageserverHttpClient(requests.Session):
|
||||
},
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
def deletion_queue_flush(self, execute: bool = False):
|
||||
self.put(
|
||||
f"http://localhost:{self.port}/v1/deletion_queue/flush?execute={'true' if execute else 'false'}"
|
||||
).raise_for_status()
|
||||
|
||||
@@ -7,7 +7,10 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
# Test restarting page server, while safekeeper and compute node keep
|
||||
# running.
|
||||
def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize("generations", [True, False])
|
||||
def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool):
|
||||
neon_env_builder.enable_generations = generations
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("test_pageserver_restart")
|
||||
|
||||
@@ -12,7 +12,10 @@ from typing import Dict, List, Optional, Tuple
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
last_flush_lsn_upload,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
@@ -52,9 +55,9 @@ from requests import ReadTimeout
|
||||
#
|
||||
# The tests are done for all types of remote storage pageserver supports.
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
|
||||
@pytest.mark.parametrize("generations", [True, False])
|
||||
def test_remote_storage_backup_and_restore(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, generations: bool
|
||||
):
|
||||
# Use this test to check more realistic SK ids: some etcd key parsing bugs were related,
|
||||
# and this test needs SK to write data to pageserver, so it will be visible
|
||||
@@ -65,6 +68,8 @@ def test_remote_storage_backup_and_restore(
|
||||
test_name="test_remote_storage_backup_and_restore",
|
||||
)
|
||||
|
||||
neon_env_builder.enable_generations = generations
|
||||
|
||||
# Exercise retry code path by making all uploads and downloads fail for the
|
||||
# first time. The retries print INFO-messages to the log; we will check
|
||||
# that they are present after the test.
|
||||
@@ -155,7 +160,8 @@ def test_remote_storage_backup_and_restore(
|
||||
# background task to load the tenant. In that background task,
|
||||
# listing the remote timelines will fail because of the failpoint,
|
||||
# and the tenant will be marked as Broken.
|
||||
client.tenant_attach(tenant_id)
|
||||
# client.tenant_attach(tenant_id)
|
||||
env.pageserver.tenant_attach(tenant_id)
|
||||
|
||||
tenant_info = wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 15)
|
||||
assert tenant_info["attachment_status"] == {
|
||||
@@ -165,7 +171,7 @@ def test_remote_storage_backup_and_restore(
|
||||
|
||||
# Ensure that even though the tenant is broken, we can't attach it again.
|
||||
with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state: Broken"):
|
||||
client.tenant_attach(tenant_id)
|
||||
env.pageserver.tenant_attach(tenant_id)
|
||||
|
||||
# Restart again, this implicitly clears the failpoint.
|
||||
# test_remote_failures=1 remains active, though, as it's in the pageserver config.
|
||||
@@ -183,7 +189,7 @@ def test_remote_storage_backup_and_restore(
|
||||
# Ensure that the pageserver remembers that the tenant was attaching, by
|
||||
# trying to attach it again. It should fail.
|
||||
with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state:"):
|
||||
client.tenant_attach(tenant_id)
|
||||
env.pageserver.tenant_attach(tenant_id)
|
||||
log.info("waiting for tenant to become active. this should be quick with on-demand download")
|
||||
|
||||
wait_until_tenant_active(
|
||||
@@ -250,35 +256,20 @@ def test_remote_storage_upload_queue_retries(
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
|
||||
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
|
||||
|
||||
def configure_storage_sync_failpoints(action):
|
||||
def configure_storage_write_failpoints(action):
|
||||
client.configure_failpoints(
|
||||
[
|
||||
("before-upload-layer", action),
|
||||
("before-upload-index", action),
|
||||
("before-delete-layer", action),
|
||||
]
|
||||
)
|
||||
|
||||
def overwrite_data_and_wait_for_it_to_arrive_at_pageserver(data):
|
||||
# create initial set of layers & upload them with failpoints configured
|
||||
endpoint.safe_psql_many(
|
||||
def configure_storage_delete_failpoints(action):
|
||||
client.configure_failpoints(
|
||||
[
|
||||
f"""
|
||||
INSERT INTO foo (id, val)
|
||||
SELECT g, '{data}'
|
||||
FROM generate_series(1, 20000) g
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET val = EXCLUDED.val
|
||||
""",
|
||||
# to ensure that GC can actually remove some layers
|
||||
"VACUUM foo",
|
||||
("deletion-queue-before-execute", action),
|
||||
]
|
||||
)
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
def get_queued_count(file_kind, op_kind):
|
||||
val = client.get_remote_timeline_client_metric(
|
||||
@@ -291,23 +282,52 @@ def test_remote_storage_upload_queue_retries(
|
||||
assert val is not None, "expecting metric to be present"
|
||||
return int(val)
|
||||
|
||||
# create some layers & wait for uploads to finish
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("a")
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("b")
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
gc_result = client.timeline_gc(tenant_id, timeline_id, 0)
|
||||
print_gc_result(gc_result)
|
||||
assert gc_result["layers_removed"] > 0
|
||||
def get_deletions_executed() -> int:
|
||||
executed = client.get_metric_value("pageserver_deletion_queue_executed_total")
|
||||
if executed is None:
|
||||
return 0
|
||||
else:
|
||||
return int(executed)
|
||||
|
||||
wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0)
|
||||
wait_until(2, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0)
|
||||
wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0)
|
||||
def get_deletion_errors(op_type) -> int:
|
||||
executed = client.get_metric_value(
|
||||
"pageserver_deletion_queue_errors_total", {"op_kind": op_type}
|
||||
)
|
||||
if executed is None:
|
||||
return 0
|
||||
else:
|
||||
return int(executed)
|
||||
|
||||
def assert_queued_count(file_kind: str, op_kind: str, fn):
|
||||
v = get_queued_count(file_kind=file_kind, op_kind=op_kind)
|
||||
log.info(f"queue count: {file_kind} {op_kind} {v}")
|
||||
assert fn(v)
|
||||
|
||||
# Push some uploads into the remote_timeline_client queues, before failpoints
|
||||
# are enabled: these should execute and the queue should revert to zero depth
|
||||
generate_uploads_and_deletions(env, tenant_id=tenant_id, timeline_id=timeline_id)
|
||||
|
||||
wait_until(2, 1, lambda: assert_queued_count("layer", "upload", lambda v: v == 0))
|
||||
wait_until(2, 1, lambda: assert_queued_count("index", "upload", lambda v: v == 0))
|
||||
|
||||
# Wait for some deletions to happen in the above compactions, assert that
|
||||
# our metrics of interest exist
|
||||
wait_until(2, 1, lambda: assert_deletion_queue(client, lambda v: v is not None))
|
||||
|
||||
# Before enabling failpoints, flushing deletions through should work
|
||||
client.deletion_queue_flush(execute=True)
|
||||
executed = client.get_metric_value("pageserver_deletion_queue_executed_total")
|
||||
assert executed is not None
|
||||
assert executed > 0
|
||||
|
||||
# let all future operations queue up
|
||||
configure_storage_sync_failpoints("return")
|
||||
configure_storage_write_failpoints("return")
|
||||
configure_storage_delete_failpoints("return")
|
||||
|
||||
# Snapshot of executed deletions: should not increment while failpoint is enabled
|
||||
deletions_executed_pre_failpoint = client.get_metric_value(
|
||||
"pageserver_deletion_queue_executed_total"
|
||||
)
|
||||
|
||||
# Create more churn to generate all upload ops.
|
||||
# The checkpoint / compact / gc ops will block because they call remote_client.wait_completion().
|
||||
@@ -315,38 +335,77 @@ def test_remote_storage_upload_queue_retries(
|
||||
churn_thread_result = [False]
|
||||
|
||||
def churn_while_failpoints_active(result):
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("c")
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("d")
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
gc_result = client.timeline_gc(tenant_id, timeline_id, 0)
|
||||
print_gc_result(gc_result)
|
||||
assert gc_result["layers_removed"] > 0
|
||||
generate_uploads_and_deletions(
|
||||
env, init=False, tenant_id=tenant_id, timeline_id=timeline_id, data="d"
|
||||
)
|
||||
result[0] = True
|
||||
|
||||
churn_while_failpoints_active_thread = threading.Thread(
|
||||
target=churn_while_failpoints_active, args=[churn_thread_result]
|
||||
)
|
||||
log.info("Entered churn phase")
|
||||
churn_while_failpoints_active_thread.start()
|
||||
|
||||
# wait for churn thread's data to get stuck in the upload queue
|
||||
wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="upload") > 0)
|
||||
wait_until(10, 0.1, lambda: get_queued_count(file_kind="index", op_kind="upload") >= 2)
|
||||
wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="delete") > 0)
|
||||
try:
|
||||
# wait for churn thread's data to get stuck in the upload queue
|
||||
wait_until(10, 0.1, lambda: assert_queued_count("layer", "upload", lambda v: v > 0))
|
||||
wait_until(10, 0.1, lambda: assert_queued_count("index", "upload", lambda v: v >= 2))
|
||||
|
||||
# unblock churn operations
|
||||
configure_storage_sync_failpoints("off")
|
||||
# Deletion queue should not grow, because deletions wait for upload of
|
||||
# metadata, and we blocked that upload.
|
||||
wait_until(10, 0.5, lambda: assert_deletion_queue(client, lambda v: v == 0))
|
||||
|
||||
# ... and wait for them to finish. Exponential back-off in upload queue, so, gracious timeouts.
|
||||
wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0)
|
||||
wait_until(30, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0)
|
||||
wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0)
|
||||
# No more deletions should have executed
|
||||
assert get_deletions_executed() == deletions_executed_pre_failpoint
|
||||
|
||||
# unblock write operations
|
||||
log.info("Unblocking remote writes")
|
||||
configure_storage_write_failpoints("off")
|
||||
|
||||
# ... and wait for them to finish. Exponential back-off in upload queue, so, gracious timeouts.
|
||||
wait_until(30, 1, lambda: assert_queued_count("layer", "upload", lambda v: v == 0))
|
||||
wait_until(30, 1, lambda: assert_queued_count("index", "upload", lambda v: v == 0))
|
||||
|
||||
# Deletions should have been enqueued now that index uploads proceeded
|
||||
log.info("Waiting to see deletions enqueued")
|
||||
wait_until(10, 1, lambda: assert_deletion_queue(client, lambda v: v > 0))
|
||||
|
||||
# Run flush in the backgrorund because it will block on the failpoint
|
||||
class background_flush(threading.Thread):
|
||||
def run(self):
|
||||
client.deletion_queue_flush(execute=True)
|
||||
|
||||
flusher = background_flush()
|
||||
flusher.start()
|
||||
|
||||
def assert_failpoint_hit():
|
||||
assert get_deletion_errors("failpoint") > 0
|
||||
|
||||
# Our background flush thread should induce us to hit the failpoint
|
||||
wait_until(20, 0.25, assert_failpoint_hit)
|
||||
|
||||
# Deletions should not have been executed while failpoint is still active.
|
||||
assert get_deletion_queue_depth(client) is not None
|
||||
assert get_deletion_queue_depth(client) > 0
|
||||
assert get_deletions_executed() == deletions_executed_pre_failpoint
|
||||
|
||||
log.info("Unblocking remote deletes")
|
||||
configure_storage_delete_failpoints("off")
|
||||
|
||||
# An API flush should now complete
|
||||
flusher.join()
|
||||
|
||||
# Queue should drain, which should involve executing some deletions
|
||||
wait_until(2, 1, lambda: assert_deletion_queue(client, lambda v: v == 0))
|
||||
assert get_deletions_executed() > deletions_executed_pre_failpoint
|
||||
|
||||
finally:
|
||||
# The churn thread doesn't make progress once it blocks on the first wait_completion() call,
|
||||
# so, give it some time to wrap up.
|
||||
log.info("Joining churn workload")
|
||||
churn_while_failpoints_active_thread.join(30)
|
||||
log.info("Joined churn workload")
|
||||
|
||||
# The churn thread doesn't make progress once it blocks on the first wait_completion() call,
|
||||
# so, give it some time to wrap up.
|
||||
churn_while_failpoints_active_thread.join(30)
|
||||
assert not churn_while_failpoints_active_thread.is_alive()
|
||||
assert churn_thread_result[0]
|
||||
|
||||
@@ -364,7 +423,7 @@ def test_remote_storage_upload_queue_retries(
|
||||
env.pageserver.start()
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
client.tenant_attach(tenant_id)
|
||||
env.pageserver.tenant_attach(tenant_id)
|
||||
|
||||
wait_until_tenant_active(client, tenant_id)
|
||||
|
||||
@@ -432,7 +491,6 @@ def test_remote_timeline_client_calls_started_metric(
|
||||
calls_started: Dict[Tuple[str, str], List[int]] = {
|
||||
("layer", "upload"): [0],
|
||||
("index", "upload"): [0],
|
||||
("layer", "delete"): [0],
|
||||
}
|
||||
|
||||
def fetch_calls_started():
|
||||
@@ -502,7 +560,7 @@ def test_remote_timeline_client_calls_started_metric(
|
||||
env.pageserver.start()
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
client.tenant_attach(tenant_id)
|
||||
env.pageserver.tenant_attach(tenant_id)
|
||||
|
||||
wait_until_tenant_active(client, tenant_id)
|
||||
|
||||
@@ -930,4 +988,154 @@ def assert_nothing_to_upload(
|
||||
assert Lsn(detail["last_record_lsn"]) == Lsn(detail["remote_consistent_lsn"])
|
||||
|
||||
|
||||
def get_deletion_queue_depth(ps_http) -> int:
|
||||
"""
|
||||
Queue depth if at least one deletion has been submitted, else None
|
||||
"""
|
||||
submitted = ps_http.get_metric_value("pageserver_deletion_queue_submitted_total")
|
||||
|
||||
if submitted is None:
|
||||
return 0
|
||||
|
||||
executed = ps_http.get_metric_value("pageserver_deletion_queue_executed_total")
|
||||
executed = 0 if executed is None else executed
|
||||
|
||||
depth = submitted - executed
|
||||
assert depth >= 0
|
||||
|
||||
log.info(f"get_deletion_queue_depth: {depth} ({submitted} - {executed})")
|
||||
return int(depth)
|
||||
|
||||
|
||||
def assert_deletion_queue(ps_http, size_fn) -> None:
|
||||
v = get_deletion_queue_depth(ps_http)
|
||||
assert v is not None
|
||||
assert size_fn(v) is True
|
||||
|
||||
|
||||
# TODO Test that we correctly handle GC of files that are stuck in upload queue.
|
||||
|
||||
|
||||
def generate_uploads_and_deletions(
|
||||
env: NeonEnv,
|
||||
*,
|
||||
init: bool = True,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
timeline_id: Optional[TimelineId] = None,
|
||||
data: Optional[str] = None,
|
||||
):
|
||||
"""
|
||||
Using the environment's default tenant + timeline, generate a load pattern
|
||||
that results in some uploads and some deletions to remote storage.
|
||||
"""
|
||||
|
||||
if tenant_id is None:
|
||||
tenant_id = env.initial_tenant
|
||||
assert tenant_id is not None
|
||||
|
||||
if timeline_id is None:
|
||||
timeline_id = env.initial_timeline
|
||||
assert timeline_id is not None
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
if init:
|
||||
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
def churn(data):
|
||||
endpoint.safe_psql_many(
|
||||
[
|
||||
f"""
|
||||
INSERT INTO foo (id, val)
|
||||
SELECT g, '{data}'
|
||||
FROM generate_series(1, 20000) g
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET val = EXCLUDED.val
|
||||
""",
|
||||
# to ensure that GC can actually remove some layers
|
||||
"VACUUM foo",
|
||||
]
|
||||
)
|
||||
assert tenant_id is not None
|
||||
assert timeline_id is not None
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
# Compaction should generate some GC-elegible layers
|
||||
for i in range(0, 2):
|
||||
churn(f"{i if data is None else data}")
|
||||
|
||||
gc_result = ps_http.timeline_gc(tenant_id, timeline_id, 0)
|
||||
print_gc_result(gc_result)
|
||||
assert gc_result["layers_removed"] > 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_deletion_queue_recovery(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_deletion_queue_recovery",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": f"{128 * 1024}",
|
||||
"compaction_threshold": "1",
|
||||
"compaction_target_size": f"{128 * 1024}",
|
||||
# no PITR horizon, we specify the horizon when we request on-demand GC
|
||||
"pitr_interval": "0s",
|
||||
# disable background compaction and GC. We invoke it manually when we want it to happen.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# create image layers eagerly, so that GC can remove some layers
|
||||
"image_creation_threshold": "1",
|
||||
}
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# Prevent deletion lists from being executed, to build up some backlog of deletions
|
||||
ps_http.configure_failpoints(
|
||||
[
|
||||
("deletion-queue-before-execute", "return"),
|
||||
]
|
||||
)
|
||||
|
||||
generate_uploads_and_deletions(env)
|
||||
|
||||
# There should be entries in the deletion queue
|
||||
assert_deletion_queue(ps_http, lambda n: n > 0)
|
||||
ps_http.deletion_queue_flush()
|
||||
before_restart_depth = get_deletion_queue_depth(ps_http)
|
||||
|
||||
log.info(f"Restarting pageserver with {before_restart_depth} deletions enqueued")
|
||||
env.pageserver.stop(immediate=True)
|
||||
env.pageserver.start()
|
||||
|
||||
def assert_deletions_submitted(n: int):
|
||||
assert ps_http.get_metric_value("pageserver_deletion_queue_submitted_total") == n
|
||||
|
||||
# After restart, issue a flush to kick the deletion frorntend to do recovery.
|
||||
# It should recover all the operations we submitted before the restart.
|
||||
ps_http.deletion_queue_flush(execute=False)
|
||||
wait_until(20, 0.25, lambda: assert_deletions_submitted(before_restart_depth))
|
||||
|
||||
# The queue should drain through completely if we flush it
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
wait_until(10, 1, lambda: assert_deletion_queue(ps_http, lambda n: n == 0))
|
||||
|
||||
# Restart again
|
||||
env.pageserver.stop(immediate=True)
|
||||
env.pageserver.start()
|
||||
|
||||
# No deletion lists should be recovered: this demonstrates that deletion lists
|
||||
# were cleaned up after being executed.
|
||||
time.sleep(1)
|
||||
assert_deletion_queue(ps_http, lambda n: n == 0)
|
||||
|
||||
@@ -47,6 +47,15 @@ def test_tenant_delete_smoke(
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# The deletion queue will complain when it encounters simulated S3 errors
|
||||
".*deletion frontend: Failed to write deletion list.*",
|
||||
".*deletion backend: Failed to delete deletion list.*",
|
||||
".*deletion executor: DeleteObjects request failed.*",
|
||||
".*deletion backend: Failed to upload deletion queue header.*",
|
||||
]
|
||||
)
|
||||
|
||||
# lucky race with stopping from flushing a layer we fail to schedule any uploads
|
||||
env.pageserver.allowed_errors.append(
|
||||
@@ -91,7 +100,9 @@ def test_tenant_delete_smoke(
|
||||
|
||||
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
|
||||
|
||||
tenant_delete_wait_completed(ps_http, tenant_id, iterations)
|
||||
# We are running with failures enabled, so this may take some time to make
|
||||
# it through all the remote storage operations required to complete
|
||||
tenant_delete_wait_completed(ps_http, tenant_id, iterations * 10)
|
||||
|
||||
tenant_path = env.tenant_dir(tenant_id=tenant_id)
|
||||
assert not tenant_path.exists()
|
||||
@@ -201,6 +212,17 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
|
||||
]
|
||||
)
|
||||
|
||||
if simulate_failures:
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# The deletion queue will complain when it encounters simulated S3 errors
|
||||
".*deletion frontend: Failed to write deletion list.*",
|
||||
".*deletion backend: Failed to delete deletion list.*",
|
||||
".*deletion executor: DeleteObjects request failed.*",
|
||||
".*deletion backend: Failed to upload deletion queue header.*",
|
||||
]
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
timeline_id = env.neon_cli.create_timeline("delete", tenant_id=tenant_id)
|
||||
|
||||
@@ -488,7 +488,14 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
|
||||
# Wait for tenant to finish loading.
|
||||
wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=1)
|
||||
|
||||
wait_timeline_detail_404(ps_http, env.initial_tenant, leaf_timeline_id, iterations=4)
|
||||
# Timeline deletion takes some finite time after startup
|
||||
wait_timeline_detail_404(
|
||||
ps_http,
|
||||
tenant_id=env.initial_tenant,
|
||||
timeline_id=leaf_timeline_id,
|
||||
iterations=20,
|
||||
interval=0.5,
|
||||
)
|
||||
|
||||
assert (
|
||||
not leaf_timeline_path.exists()
|
||||
@@ -534,7 +541,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
|
||||
wait_until(
|
||||
2,
|
||||
0.5,
|
||||
lambda: assert_prefix_empty(neon_env_builder),
|
||||
lambda: assert_prefix_empty(neon_env_builder, prefix="/tenants"),
|
||||
)
|
||||
|
||||
|
||||
@@ -688,7 +695,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
|
||||
wait_until(50, 0.1, first_request_finished)
|
||||
|
||||
# check that the timeline is gone
|
||||
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=2)
|
||||
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=4)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -772,7 +779,11 @@ def test_timeline_delete_works_for_remote_smoke(
|
||||
|
||||
# for some reason the check above doesnt immediately take effect for the below.
|
||||
# Assume it is mock server inconsistency and check twice.
|
||||
wait_until(2, 0.5, lambda: assert_prefix_empty(neon_env_builder))
|
||||
wait_until(
|
||||
2,
|
||||
0.5,
|
||||
lambda: assert_prefix_empty(neon_env_builder, "/tenants"),
|
||||
)
|
||||
|
||||
|
||||
def test_delete_orphaned_objects(
|
||||
@@ -827,6 +838,8 @@ def test_delete_orphaned_objects(
|
||||
reason = timeline_info["state"]["Broken"]["reason"]
|
||||
assert reason.endswith(f"failpoint: {failpoint}"), reason
|
||||
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
|
||||
for orphan in orphans:
|
||||
assert not orphan.exists()
|
||||
assert env.pageserver.log_contains(
|
||||
|
||||
Reference in New Issue
Block a user