diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index a65e39dcd0..447d459435 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -3,20 +3,16 @@ mod executor; mod frontend; use std::collections::HashMap; -use std::path::PathBuf; + use std::sync::Arc; use std::time::Duration; use crate::control_plane_client::ControlPlaneGenerationsApi; use crate::metrics::DELETION_QUEUE_SUBMITTED; use crate::tenant::remote_timeline_client::remote_layer_path; -use crate::tenant::remote_timeline_client::remote_timeline_path; -use anyhow::Context; -use hex::FromHex; + use remote_storage::{GenericRemoteStorage, RemotePath}; -use serde::Deserialize; -use serde::Serialize; -use serde_with::serde_as; + use thiserror::Error; use tokio; use tokio_util::sync::CancellationToken; @@ -146,238 +142,8 @@ pub struct DeletionQueueClient { lsn_table: Arc>, } -#[derive(Debug, Serialize, Deserialize)] -struct TenantDeletionList { - /// For each Timeline, a list of key fragments to append to the timeline remote path - /// when reconstructing a full key - #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")] - timelines: HashMap>, - - /// The generation in which this deletion was emitted: note that this may not be the - /// same as the generation of any layers being deleted. The generation of the layer - /// has already been absorbed into the keys in `objects` - generation: Generation, -} - -impl TenantDeletionList { - pub(crate) fn len(&self) -> usize { - self.timelines.values().map(|v| v.len()).sum() - } -} - -/// For HashMaps using a `hex` compatible key, where we would like to encode the key as a string -fn to_hex_map(input: &HashMap, serializer: S) -> Result -where - S: serde::Serializer, - V: Serialize, - I: AsRef<[u8]>, -{ - let transformed = input.iter().map(|(k, v)| (hex::encode(k), v.clone())); - - transformed - .collect::>() - .serialize(serializer) -} - -/// For HashMaps using a FromHex key, where we would like to decode the key -fn from_hex_map<'de, D, V, I>(deserializer: D) -> Result, D::Error> -where - D: serde::de::Deserializer<'de>, - V: Deserialize<'de>, - I: FromHex + std::hash::Hash + Eq, -{ - let hex_map = HashMap::::deserialize(deserializer)?; - hex_map - .into_iter() - .map(|(k, v)| { - I::from_hex(k) - .map(|k| (k, v)) - .map_err(|_| serde::de::Error::custom("Invalid hex ID")) - }) - .collect() -} - -#[serde_as] -#[derive(Debug, Serialize, Deserialize)] -struct DeletionList { - /// Serialization version, for future use - version: u8, - - /// Used for constructing a unique key for each deletion list we write out. - sequence: u64, - - /// To avoid repeating tenant/timeline IDs in every key, we store keys in - /// nested HashMaps by TenantTimelineID. Each Tenant only appears once - /// with one unique generation ID: if someone tries to push a second generation - /// ID for the same tenant, we will start a new DeletionList. - #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")] - tenants: HashMap, - - /// Avoid having to walk `tenants` to calculate the number of keys in - /// the nested deletion lists - size: usize, - - /// Set to true when the list has undergone validation with the control - /// plane and the remaining contents of `tenants` are valid. A list may - /// also be implicitly marked valid by DeletionHeader.validated_sequence - /// advancing to >= DeletionList.sequence - #[serde(default)] - #[serde(skip_serializing_if = "std::ops::Not::not")] - validated: bool, -} - -#[serde_as] -#[derive(Debug, Serialize, Deserialize)] -struct DeletionHeader { - /// Serialization version, for future use - version: u8, - - /// The highest sequence number (inclusive) that has been validated. All deletion - /// lists on disk with a sequence <= this value are safe to execute. - validated_sequence: u64, -} - -impl DeletionHeader { - const VERSION_LATEST: u8 = 1; - - fn new(validated_sequence: u64) -> Self { - Self { - version: Self::VERSION_LATEST, - validated_sequence, - } - } - - async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> { - debug!("Saving deletion list header {:?}", self); - let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?; - let header_path = conf.deletion_header_path(); - - tokio::fs::write(&header_path, header_bytes) - .await - .map_err(|e| anyhow::anyhow!(e)) - } -} - -impl DeletionList { - const VERSION_LATEST: u8 = 1; - fn new(sequence: u64) -> Self { - Self { - version: Self::VERSION_LATEST, - sequence, - tenants: HashMap::new(), - size: 0, - validated: false, - } - } - - fn drain(&mut self) -> Self { - let mut tenants = HashMap::new(); - std::mem::swap(&mut self.tenants, &mut tenants); - let other = Self { - version: Self::VERSION_LATEST, - sequence: self.sequence, - tenants, - size: self.size, - validated: self.validated, - }; - self.size = 0; - other - } - - fn is_empty(&self) -> bool { - self.tenants.is_empty() - } - - fn len(&self) -> usize { - self.size - } - - /// Returns true if the push was accepted, false if the caller must start a new - /// deletion list. - fn push( - &mut self, - tenant: &TenantId, - timeline: &TimelineId, - generation: Generation, - objects: &mut Vec, - ) -> bool { - if objects.is_empty() { - // Avoid inserting an empty TimelineDeletionList: this preserves the property - // that if we have no keys, then self.objects is empty (used in Self::is_empty) - return true; - } - - let tenant_entry = self - .tenants - .entry(*tenant) - .or_insert_with(|| TenantDeletionList { - timelines: HashMap::new(), - generation, - }); - - if tenant_entry.generation != generation { - // Only one generation per tenant per list: signal to - // caller to start a new list. - return false; - } - - let timeline_entry = tenant_entry - .timelines - .entry(*timeline) - .or_insert_with(Vec::new); - - let timeline_remote_path = remote_timeline_path(tenant, timeline); - - self.size += objects.len(); - timeline_entry.extend(objects.drain(..).map(|p| { - p.strip_prefix(&timeline_remote_path) - .expect("Timeline paths always start with the timeline prefix") - .to_string_lossy() - .to_string() - })); - true - } - - fn drain_paths(&mut self) -> Vec { - let mut tenants = HashMap::new(); - std::mem::swap(&mut tenants, &mut self.tenants); - - let mut result = Vec::new(); - for (tenant, tenant_deletions) in tenants.into_iter() { - for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() { - let timeline_remote_path = remote_timeline_path(&tenant, &timeline); - result.extend( - timeline_layers - .into_iter() - .map(|l| timeline_remote_path.join(&PathBuf::from(l))), - ); - } - } - - result - } - - async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> { - let path = conf.deletion_list_path(self.sequence); - - let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list"); - tokio::fs::write(&path, &bytes).await?; - tokio::fs::File::open(&path).await?.sync_all().await?; - Ok(()) - } -} - -impl std::fmt::Display for DeletionList { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "DeletionList", - self.sequence, - self.tenants.len(), - self.size - ) - } -} +mod deletion_list; +use deletion_list::*; struct PendingLsn { projected: Lsn, diff --git a/pageserver/src/deletion_queue/deletion_list.rs b/pageserver/src/deletion_queue/deletion_list.rs new file mode 100644 index 0000000000..039e71e2a8 --- /dev/null +++ b/pageserver/src/deletion_queue/deletion_list.rs @@ -0,0 +1,259 @@ +use std::path::PathBuf; + +use crate::tenant::remote_timeline_client::remote_timeline_path; + +use anyhow::Context; +use remote_storage::RemotePath; + +use tracing::debug; + +use crate::config::PageServerConf; + +use utils::id::TenantId; + +use serde_with::serde_as; + +use hex::FromHex; + +use serde::Deserialize; + +use serde::Serialize; + +use utils::generation::Generation; + +use utils::id::TimelineId; + +use std::collections::HashMap; + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct TenantDeletionList { + /// For each Timeline, a list of key fragments to append to the timeline remote path + /// when reconstructing a full key + #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")] + pub(crate) timelines: HashMap>, + + /// The generation in which this deletion was emitted: note that this may not be the + /// same as the generation of any layers being deleted. The generation of the layer + /// has already been absorbed into the keys in `objects` + pub(crate) generation: Generation, +} + +impl TenantDeletionList { + pub(crate) fn len(&self) -> usize { + self.timelines.values().map(|v| v.len()).sum() + } +} + +/// For HashMaps using a `hex` compatible key, where we would like to encode the key as a string +pub(crate) fn to_hex_map(input: &HashMap, serializer: S) -> Result +where + S: serde::Serializer, + V: Serialize, + I: AsRef<[u8]>, +{ + let transformed = input.iter().map(|(k, v)| (hex::encode(k), v.clone())); + + transformed + .collect::>() + .serialize(serializer) +} + +/// For HashMaps using a FromHex key, where we would like to decode the key +pub(crate) fn from_hex_map<'de, D, V, I>(deserializer: D) -> Result, D::Error> +where + D: serde::de::Deserializer<'de>, + V: Deserialize<'de>, + I: FromHex + std::hash::Hash + Eq, +{ + let hex_map = HashMap::::deserialize(deserializer)?; + hex_map + .into_iter() + .map(|(k, v)| { + I::from_hex(k) + .map(|k| (k, v)) + .map_err(|_| serde::de::Error::custom("Invalid hex ID")) + }) + .collect() +} + +#[serde_as] +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct DeletionList { + /// Serialization version, for future use + version: u8, + + /// Used for constructing a unique key for each deletion list we write out. + pub(crate) sequence: u64, // FIXME: make private + + /// To avoid repeating tenant/timeline IDs in every key, we store keys in + /// nested HashMaps by TenantTimelineID. Each Tenant only appears once + /// with one unique generation ID: if someone tries to push a second generation + /// ID for the same tenant, we will start a new DeletionList. + #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")] + pub(crate) tenants: HashMap, + + /// Avoid having to walk `tenants` to calculate the number of keys in + /// the nested deletion lists + size: usize, + + /// Set to true when the list has undergone validation with the control + /// plane and the remaining contents of `tenants` are valid. A list may + /// also be implicitly marked valid by DeletionHeader.validated_sequence + /// advancing to >= DeletionList.sequence + #[serde(default)] + #[serde(skip_serializing_if = "std::ops::Not::not")] + pub(crate) validated: bool, // FIXME: make private +} + +#[serde_as] +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct DeletionHeader { + /// Serialization version, for future use + version: u8, + + /// The highest sequence number (inclusive) that has been validated. All deletion + /// lists on disk with a sequence <= this value are safe to execute. + pub(crate) validated_sequence: u64, // FIXME: make private +} + +impl DeletionHeader { + pub(crate) const VERSION_LATEST: u8 = 1; + + pub(crate) fn new(validated_sequence: u64) -> Self { + Self { + version: Self::VERSION_LATEST, + validated_sequence, + } + } + + pub(crate) async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> { + debug!("Saving deletion list header {:?}", self); + let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?; + let header_path = conf.deletion_header_path(); + + tokio::fs::write(&header_path, header_bytes) + .await + .map_err(|e| anyhow::anyhow!(e)) + } +} + +impl DeletionList { + pub(crate) const VERSION_LATEST: u8 = 1; + pub(crate) fn new(sequence: u64) -> Self { + Self { + version: Self::VERSION_LATEST, + sequence, + tenants: HashMap::new(), + size: 0, + validated: false, + } + } + + pub(crate) fn drain(&mut self) -> Self { + let mut tenants = HashMap::new(); + std::mem::swap(&mut self.tenants, &mut tenants); + let other = Self { + version: Self::VERSION_LATEST, + sequence: self.sequence, + tenants, + size: self.size, + validated: self.validated, + }; + self.size = 0; + other + } + + pub(crate) fn is_empty(&self) -> bool { + self.tenants.is_empty() + } + + pub(crate) fn len(&self) -> usize { + self.size + } + + /// Returns true if the push was accepted, false if the caller must start a new + /// deletion list. + pub(crate) fn push( + &mut self, + tenant: &TenantId, + timeline: &TimelineId, + generation: Generation, + objects: &mut Vec, + ) -> bool { + if objects.is_empty() { + // Avoid inserting an empty TimelineDeletionList: this preserves the property + // that if we have no keys, then self.objects is empty (used in Self::is_empty) + return true; + } + + let tenant_entry = self + .tenants + .entry(*tenant) + .or_insert_with(|| TenantDeletionList { + timelines: HashMap::new(), + generation, + }); + + if tenant_entry.generation != generation { + // Only one generation per tenant per list: signal to + // caller to start a new list. + return false; + } + + let timeline_entry = tenant_entry + .timelines + .entry(*timeline) + .or_insert_with(Vec::new); + + let timeline_remote_path = remote_timeline_path(tenant, timeline); + + self.size += objects.len(); + timeline_entry.extend(objects.drain(..).map(|p| { + p.strip_prefix(&timeline_remote_path) + .expect("Timeline paths always start with the timeline prefix") + .to_string_lossy() + .to_string() + })); + true + } + + pub(crate) fn drain_paths(&mut self) -> Vec { + let mut tenants = HashMap::new(); + std::mem::swap(&mut tenants, &mut self.tenants); + + let mut result = Vec::new(); + for (tenant, tenant_deletions) in tenants.into_iter() { + for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() { + let timeline_remote_path = remote_timeline_path(&tenant, &timeline); + result.extend( + timeline_layers + .into_iter() + .map(|l| timeline_remote_path.join(&PathBuf::from(l))), + ); + } + } + + result + } + + pub(crate) async fn save(&self, conf: &'static PageServerConf) -> anyhow::Result<()> { + let path = conf.deletion_list_path(self.sequence); + + let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list"); + tokio::fs::write(&path, &bytes).await?; + tokio::fs::File::open(&path).await?.sync_all().await?; + Ok(()) + } +} + +impl std::fmt::Display for DeletionList { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "DeletionList", + self.sequence, + self.tenants.len(), + self.size + ) + } +}