{
// Main Hyper HTTP server function that runs it and blocks waiting on it forever.
#[tokio::main]
async fn serve(port: u16, state: Arc) {
- let addr = SocketAddr::from(([0, 0, 0, 0], port));
+ // this usually binds to both IPv4 and IPv6 on linux
+ // see e.g. https://github.com/rust-lang/rust/pull/34440
+ let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port);
let make_service = make_service_fn(move |_conn| {
let state = state.clone();
diff --git a/compute_tools/src/params.rs b/compute_tools/src/params.rs
index 0ce01ff478..4ccb403ca6 100644
--- a/compute_tools/src/params.rs
+++ b/compute_tools/src/params.rs
@@ -6,4 +6,4 @@ pub const DEFAULT_LOG_LEVEL: &str = "info";
// https://www.postgresql.org/docs/15/auth-password.html
//
// So it's safe to set md5 here, as `control-plane` anyway uses SCRAM for all roles.
-pub const PG_HBA_ALL_MD5: &str = "host\tall\t\tall\t\t0.0.0.0/0\t\tmd5";
+pub const PG_HBA_ALL_MD5: &str = "host\tall\t\tall\t\tall\t\tmd5";
diff --git a/docs/rfcs/027-crash-consistent-layer-map-through-index-part.md b/docs/rfcs/027-crash-consistent-layer-map-through-index-part.md
new file mode 100644
index 0000000000..2c6b46eabe
--- /dev/null
+++ b/docs/rfcs/027-crash-consistent-layer-map-through-index-part.md
@@ -0,0 +1,281 @@
+
+# Crash-Consistent Layer Map Updates By Leveraging `index_part.json`
+
+* Created on: Aug 23, 2023
+* Author: Christian Schwarz
+
+## Summary
+
+This RFC describes a simple scheme to make layer map updates crash consistent by leveraging the `index_part.json` in remote storage.
+Without such a mechanism, crashes can induce certain edge cases in which broadly held assumptions about system invariants don't hold.
+
+## Motivation
+
+### Background
+
+We can currently easily make complex, atomic updates to the layer map by means of an RwLock.
+If we crash or restart pageserver, we reconstruct the layer map from:
+1. local timeline directory contents
+2. remote `index_part.json` contents.
+
+The function that is responsible for this is called `Timeline::load_layer_map()`.
+The reconciliation process's behavior is the following:
+* local-only files will become part of the layer map as local-only layers and rescheduled for upload
+* For a file name that, by its name, is present locally and in the remote `index_part.json`, but where the local file has a different size (future: checksum) than the remote file, we will delete the local file and leave the remote file as a `RemoteLayer` in the layer map.
+
+### The Problem
+
+There are are cases where we need to make an atomic update to the layer map that involves **more than one layer**.
+The best example is compaction, where we need to insert the L1 layers generated from the L0 layers, and remove the L0 layers.
+As stated above, making the update to the layer map in atomic way is trivial.
+But, there is no system call API to make an atomic update to a directory that involves more than one file rename and deletion.
+Currently, we issue the system calls one by one and hope we don't crash.
+
+What happens if we crash and restart in the middle of that system call sequence?
+We will reconstruct the layer map according to the reconciliation process, taking as input whatever transitory state the timeline directory ended up in.
+
+We cannot roll back or complete the timeline directory update during which we crashed, because we keep no record of the changes we plan to make.
+
+### Problem's Implications For Compaction
+
+The implications of the above are primarily problematic for compaction.
+Specifically, the part of it that compacts L0 layers into L1 layers.
+
+Remember that compaction takes a set of L0 layers and reshuffles the delta records in them into L1 layer files.
+Once the L1 layer files are written to disk, it atomically removes the L0 layers from the layer map and adds the L1 layers to the layer map.
+It then deletes the L0 layers locally, and schedules an upload of the L1 layers and and updated index part.
+
+If we crash before deleting L0s, but after writing out L1s, the next compaction after restart will re-digest the L0s and produce new L1s.
+This means the compaction after restart will **overwrite** the previously written L1s.
+Currently we also schedule an S3 upload of the overwritten L1.
+
+If the compaction algorithm doesn't change between the two compaction runs, is deterministic, and uses the same set of L0s as input, then the second run will produce identical L1s and the overwrites will go unnoticed.
+
+*However*:
+1. the file size of the overwritten L1s may not be identical, and
+2. the bit pattern of the overwritten L1s may not be identical, and,
+3. in the future, we may want to make the compaction code non-determinstic, influenced by past access patterns, or otherwise change it, resulting in L1 overwrites with a different set of delta records than before the overwrite
+
+The items above are a problem for the [split-brain protection RFC](https://github.com/neondatabase/neon/pull/4919) because it assumes that layer files in S3 are only ever deleted, but never replaced (overPUTted).
+
+For example, if an unresponsive node A becomes active again after control plane has relocated the tenant to a new node B, the node A may overwrite some L1s.
+But node B based its world view on the version of node A's `index_part.json` from _before_ the overwrite.
+That earlier `index_part.json`` contained the file size of the pre-overwrite L1.
+If the overwritten L1 has a different file size, node B will refuse to read data from the overwritten L1.
+Effectively, the data in the L1 has become inaccessible to node B.
+If node B already uploaded an index part itself, all subsequent attachments will use node B's index part, and run into the same probem.
+
+If we ever introduce checksums instead of checking just the file size, then a mismatching bit pattern (2) will cause similar problems.
+
+In case of (1) and (2), where we know that the logical content of the layers is still the same, we can recover by manually patching the `index_part.json` of the new node to the overwritten L1's file size / checksum.
+
+But if (3) ever happens, the logical content may be different, and, we could have truly lost data.
+
+Given the above considerations, we should avoid making correctness of split-brain protection dependent on overwrites preserving _logical_ layer file contents.
+**It is a much cleaner separation of concerns to require that layer files are truly immutable in S3, i.e., PUT once and then only DELETEd, never overwritten (overPUTted).**
+
+## Design
+
+Instead of reconciling a layer map from local timeline directory contents and remote index part, this RFC proposes to view the remote index part as authoritative during timeline load.
+Local layer files will be recognized if they match what's listed in remote index part, and removed otherwise.
+
+During **timeline load**, the only thing that matters is the remote index part content.
+Essentially, timeline load becomes much like attach, except we don't need to prefix-list the remote timelines.
+The local timeline dir's `metadata` file does not matter.
+The layer files in the local timeline dir are seen as a nice-to-have cache of layer files that are in the remote index part.
+Any layer files in the local timeline dir that aren't in the remote index part are removed during startup.
+The `Timeline::load_layer_map()` no longer "merges" local timeline dir contents with the remote index part.
+Instead, it treats the remote index part as the authoritative layer map.
+If the local timeline dir contains a layer that is in the remote index part, that's nice, and we'll re-use it if file size (and in the future, check sum) match what's stated in the index part.
+If it doesn't match, we remove the file from the local timeline dir.
+
+After load, **at runtime**, nothing changes compared to what we did before this RFC.
+The procedure for single- and multi-object changes is reproduced here for reference:
+* For any new layers that the change adds:
+ * Write them to a temporary location.
+ * While holding layer map lock:
+ * Move them to the final location.
+ * Insert into layer map.
+* Make the S3 changes.
+ We won't reproduce the remote timeline client method calls here because these are subject to change.
+ Instead we reproduce the sequence of s3 changes that must result for a given single-/multi-object change:
+ * PUT layer files inserted by the change.
+ * PUT an index part that has insertions and deletions of the change.
+ * DELETE the layer files that are deleted by the change.
+
+Note that it is safe for the DELETE to be deferred arbitrarily.
+* If it never happens, we leak the object, but, that's not a correctness concern.
+* As of #4938, we don't schedule the remote timeline client operation for deletion immediately, but, only when we drop the `LayerInner`.
+* With the [split-brain protection RFC](https://github.com/neondatabase/neon/pull/4919), the deletions will be written to deletion queue for processing when it's safe to do so (see the RFC for details).
+
+## How This Solves The Problem
+
+If we crash before we've finished the S3 changes, then timeline load will reset layer map to the state that's in the S3 index part.
+The S3 change sequence above is obviously crash-consistent.
+If we crash before the index part PUT, then we leak the inserted layer files to S3.
+If we crash after the index part PUT, we leak the to-be-DELETEd layer files to S3.
+Leaking is fine, it's a pre-existing condition and not addressed in this RFC.
+
+Multi-object changes that previously created and removed files in timeline dir are now atomic because the layer map updates are atomic and crash consistent:
+* atomic layer map update at runtime, currently by using an RwLock in write mode
+* atomic `index_part.json` update in S3, as per guarantee that S3 PUT is atomic
+* local timeline dir state:
+ * irrelevant for layer map content => irrelevant for atomic updates / crash consistency
+ * if we crash after index part PUT, local layer files will be used, so, no on-demand downloads neede for them
+ * if we crash before index part PUT, local layer files will be deleted
+
+## Trade-Offs
+
+### Fundamental
+
+If we crash before finishing the index part PUT, we lose all the work that hasn't reached the S3 `index_part.json`:
+* wal ingest: we lose not-yet-uploaded L0s; load on the **safekeepers** + work for pageserver
+* compaction: we lose the entire compaction iteration work; need to re-do it again
+* gc: no change to what we have today
+
+If the work is still deemed necessary after restart, the restarted restarted pageserver will re-do this work.
+The amount of work to be re-do is capped to the lag of S3 changes to the local changes.
+Assuming upload queue allows for unlimited queue depth (that's what it does today), this means:
+* on-demand downloads that were needed to do the work: are likely still present, not lost
+* wal ingest: currently unbounded
+* L0 => L1 compaction: CPU time proportional to `O(sum(L0 size))` and upload work proportional to `O()`
+ * Compaction threshold is 10 L0s and each L0 can be up to 256M in size. Target size for L1 is 128M.
+ * In practive, most L0s are tiny due to 10minute `DEFAULT_CHECKPOINT_TIMEOUT`.
+* image layer generation: CPU time `O(sum(input data))` + upload work `O(sum(new image layer size))`
+ * I have no intuition how expensive / long-running it is in reality.
+* gc: `update_gc_info`` work (not substantial, AFAIK)
+
+To limit the amount of lost upload work, and ingest work, we can limit the upload queue depth (see suggestions in the next sub-section).
+However, to limit the amount of lost CPU work, we would need a way to make make the compaction/image-layer-generation algorithms interruptible & resumable.
+We aren't there yet, the need for it is tracked by ([#4580](https://github.com/neondatabase/neon/issues/4580)).
+However, this RFC is not constraining the design space either.
+
+### Practical
+
+#### Pageserver Restarts
+
+Pageserver crashes are very rare ; it would likely be acceptable to re-do the lost work in that case.
+However, regular pageserver restart happen frequently, e.g., during weekly deploys.
+
+In general, pageserver restart faces the problem of tenants that "take too long" to shut down.
+They are a problem because other tenants that shut down quickly are unavailble while we wait for the slow tenants to shut down.
+We currently allot 10 seconds for graceful shutdown until we SIGKILL the pageserver process (as per `pageserver.service` unit file).
+A longer budget would expose tenants that are done early to a longer downtime.
+A short budget would risk throwing away more work that'd have to be re-done after restart.
+
+In the context of this RFC, killing the process would mean losing the work that hasn't made it to S3.
+We can mitigate this problem as follows:
+0. initially, by accepting that we need to do the work again
+1. short-term, introducing measures to cap the amount of in-flight work:
+
+ - cap upload queue length, use backpressure to slow down compaction
+ - disabling compaction/image-layer-generation X minutes before `systemctl restart pageserver`
+ - introducing a read-only shutdown state for tenants that are fast to shut down;
+ that state would be equivalent to the state of a tenant in hot standby / readonly mode.
+
+2. mid term, by not restarting pageserver in place, but using [*seamless tenant migration*](https://github.com/neondatabase/neon/pull/5029) to drain a pageserver's tenants before we restart it.
+
+#### `disk_consistent_lsn` can go backwards
+
+`disk_consistent_lsn` can go backwards across restarts if we crash before we've finished the index part PUT.
+Nobody should care about it, because the only thing that matters is `remote_consistent_lsn`.
+Compute certainly doesn't care about `disk_consistent_lsn`.
+
+
+## Side-Effects Of This Design
+
+* local `metadata` is basically reduced to a cache of which timelines exist for this tenant; i.e., we can avoid a `ListObjects` requests for a tenant's timelines during tenant load.
+
+## Limitations
+
+Multi-object changes that span multiple timelines aren't covered by this RFC.
+That's fine because we currently don't need them, as evidenced by the absence
+of a Pageserver operation that holds multiple timelines' layer map lock at a time.
+
+## Impacted components
+
+Primarily pageservers.
+
+Safekeepers will experience more load when we need to re-ingest WAL because we've thrown away work.
+No changes to safekeepers are needed.
+
+## Alternatives considered
+
+### Alternative 1: WAL
+
+We could have a local WAL for timeline dir changes, as proposed here https://github.com/neondatabase/neon/issues/4418 and partially implemented here https://github.com/neondatabase/neon/pull/4422 .
+The WAL would be used to
+1. make multi-object changes atomic
+2. replace `reconcile_with_remote()` reconciliation: scheduling of layer upload would be part of WAL replay.
+
+The WAL is appealing in a local-first world, but, it's much more complex than the design described above:
+* New on-disk state to get right.
+* Forward- and backward-compatibility development costs in the future.
+
+### Alternative 2: Flow Everything Through `index_part.json`
+
+We could have gone to the other extreme and **only** update the layer map whenever we've PUT `index_part.json`.
+I.e., layer map would always be the last-persisted S3 state.
+That's axiomatically beautiful, not least because it fully separates the layer file production and consumption path (=> [layer file spreading proposal](https://www.notion.so/neondatabase/One-Pager-Layer-File-Spreading-Christian-eb6b64182a214e11b3fceceee688d843?pvs=4)).
+And it might make hot standbys / read-only pageservers less of a special case in the future.
+
+But, I have some uncertainties with regard to WAL ingestion, because it needs to be able to do some reads for the logical size feedback to safekeepers.
+
+And it's silly that we wouldn't be able to use the results of compaction or image layer generation before we're done with the upload.
+
+Lastly, a temporarily clogged-up upload queue (e.g. S3 is down) shouldn't immediately render ingestion unavailable.
+
+### Alternative 3: Sequence Numbers For Layers
+
+Instead of what's proposed in this RFC, we could use unique numbers to identify layer files:
+
+```
+# before
+tenants/$tenant/timelines/$timeline/$key_and_lsn_range
+# after
+tenants/$tenant/timelines/$timeline/$layer_file_id-$key_and_lsn_range
+```
+
+To guarantee uniqueness, the unqiue number is a sequence number, stored in `index_part.json`.
+
+This alternative does not solve atomic layer map updates.
+In our crash-during-compaction scenario above, the compaction run after the crash will not overwrite the L1s, but write/PUT new files with new sequence numbers.
+In fact, this alternative makes it worse because the data is now duplicated in the not-overwritten and overwritten L1 layer files.
+We'd need to write a deduplication pass that checks if perfectly overlapping layers have identical contents.
+
+However, this alternative is appealing because it systematically prevents overwrites at a lower level than this RFC.
+
+So, this alternative is sufficient for the needs of the split-brain safety RFC (immutable layer files locally and in S3).
+But it doesn't solve the problems with crash-during-compaction outlined earlier in this RFC, and in fact, makes it much more accute.
+The proposed design in this RFC addresses both.
+
+So, if this alternative sounds appealing, we should implement the proposal in this RFC first, then implement this alternative on top.
+That way, we avoid a phase where the crash-during-compaction problem is accute.
+
+## Related issues
+
+- https://github.com/neondatabase/neon/issues/4749
+- https://github.com/neondatabase/neon/issues/4418
+ - https://github.com/neondatabase/neon/pull/4422
+- https://github.com/neondatabase/neon/issues/5077
+- https://github.com/neondatabase/neon/issues/4088
+ - (re)resolutions:
+ - https://github.com/neondatabase/neon/pull/4696
+ - https://github.com/neondatabase/neon/pull/4094
+ - https://neondb.slack.com/archives/C033QLM5P7D/p1682519017949719
+
+Note that the test case introduced in https://github.com/neondatabase/neon/pull/4696/files#diff-13114949d1deb49ae394405d4c49558adad91150ba8a34004133653a8a5aeb76 will produce L1s with the same logical content, but, as outlined in the last paragraph of the _Problem Statement_ section above, we don't want to make that assumption in order to fix the problem.
+
+
+## Implementation Plan
+
+1. Remove support for `remote_storage=None`, because we now rely on the existence of an index part.
+
+ - The nasty part here is to fix all the tests that fiddle with the local timeline directory.
+ Possibly they are just irrelevant with this change, but, each case will require inspection.
+
+2. Implement the design above.
+
+ - Initially, ship without the mitigations for restart and accept we will do some work twice.
+ - Measure the impact and implement one of the mitigations.
+
diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs
index 2af54902f7..608b3cecd6 100644
--- a/pageserver/ctl/src/layers.rs
+++ b/pageserver/ctl/src/layers.rs
@@ -68,7 +68,7 @@ async fn read_delta_file(path: impl AsRef) -> Result<()> {
},
)
.await?;
- let cursor = BlockCursor::new_fileblockreader_virtual(&file);
+ let cursor = BlockCursor::new_fileblockreader(&file);
for (k, v) in all {
let value = cursor.read_blob(v.pos()).await?;
println!("key:{} value_len:{}", k, value.len());
diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs
index 72a66d51a6..2a87ee0381 100644
--- a/pageserver/src/page_service.rs
+++ b/pageserver/src/page_service.rs
@@ -469,7 +469,9 @@ impl PageServerHandler {
// Create empty timeline
info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
- let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
+ let timeline = tenant
+ .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
+ .await?;
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs
index 2168db57de..3256a00182 100644
--- a/pageserver/src/tenant.rs
+++ b/pageserver/src/tenant.rs
@@ -32,9 +32,7 @@ use std::fmt::Debug;
use std::fmt::Display;
use std::fs;
use std::fs::File;
-use std::fs::OpenOptions;
use std::io;
-use std::io::Write;
use std::ops::Bound::Included;
use std::path::Path;
use std::path::PathBuf;
@@ -68,7 +66,7 @@ use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::metadata::load_metadata;
-use crate::tenant::remote_timeline_client::index::IndexPart;
+pub use crate::tenant::remote_timeline_client::index::IndexPart;
use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
use crate::tenant::storage_layer::DeltaLayer;
use crate::tenant::storage_layer::ImageLayer;
@@ -115,7 +113,6 @@ pub mod block_io;
pub mod disk_btree;
pub(crate) mod ephemeral_file;
pub mod layer_map;
-pub mod manifest;
mod span;
pub mod metadata;
@@ -195,7 +192,7 @@ pub struct Tenant {
walredo_mgr: Arc,
// provides access to timeline data sitting in the remote storage
- remote_storage: Option,
+ pub(crate) remote_storage: Option,
/// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
cached_logical_sizes: tokio::sync::Mutex>,
@@ -407,7 +404,6 @@ impl Tenant {
remote_startup_data: Option,
local_metadata: Option,
ancestor: Option>,
- first_save: bool,
init_order: Option<&InitializationOrder>,
_ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -441,14 +437,9 @@ impl Tenant {
// Save the metadata file to local disk.
if !picked_local {
- save_metadata(
- self.conf,
- &tenant_id,
- &timeline_id,
- up_to_date_metadata,
- first_save,
- )
- .context("save_metadata")?;
+ save_metadata(self.conf, &tenant_id, &timeline_id, up_to_date_metadata)
+ .await
+ .context("save_metadata")?;
}
let index_part = remote_startup_data.as_ref().map(|x| &x.index_part);
@@ -833,7 +824,6 @@ impl Tenant {
}),
local_metadata,
ancestor,
- true,
None,
ctx,
)
@@ -1386,7 +1376,6 @@ impl Tenant {
remote_startup_data,
Some(local_metadata),
ancestor,
- false,
init_order,
ctx,
)
@@ -1450,7 +1439,7 @@ impl Tenant {
/// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
/// minimum amount of keys required to get a writable timeline.
/// (Without it, `put` might fail due to `repartition` failing.)
- pub fn create_empty_timeline(
+ pub async fn create_empty_timeline(
&self,
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
@@ -1462,10 +1451,10 @@ impl Tenant {
"Cannot create empty timelines on inactive tenant"
);
- let timelines = self.timelines.lock().unwrap();
- let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?;
- drop(timelines);
-
+ let timeline_uninit_mark = {
+ let timelines = self.timelines.lock().unwrap();
+ self.create_timeline_uninit_mark(new_timeline_id, &timelines)?
+ };
let new_metadata = TimelineMetadata::new(
// Initialize disk_consistent LSN to 0, The caller must import some data to
// make it valid, before calling finish_creation()
@@ -1484,6 +1473,7 @@ impl Tenant {
initdb_lsn,
None,
)
+ .await
}
/// Helper for unit tests to create an empty timeline.
@@ -1499,7 +1489,9 @@ impl Tenant {
pg_version: u32,
ctx: &RequestContext,
) -> anyhow::Result> {
- let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?;
+ let uninit_tl = self
+ .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
+ .await?;
let tline = uninit_tl.raw_timeline().expect("we just created it");
assert_eq!(tline.get_last_record_lsn(), Lsn(0));
@@ -1517,6 +1509,15 @@ impl Tenant {
tline.maybe_spawn_flush_loop();
tline.freeze_and_flush().await.context("freeze_and_flush")?;
+ // Make sure the freeze_and_flush reaches remote storage.
+ tline
+ .remote_client
+ .as_ref()
+ .unwrap()
+ .wait_completion()
+ .await
+ .unwrap();
+
let tl = uninit_tl.finish_creation()?;
// The non-test code would call tl.activate() here.
tl.set_state(TimelineState::Active);
@@ -1693,65 +1694,6 @@ impl Tenant {
Ok(())
}
- /// Flush all in-memory data to disk and remote storage, if any.
- ///
- /// Used at graceful shutdown.
- async fn freeze_and_flush_on_shutdown(&self) {
- let mut js = tokio::task::JoinSet::new();
-
- // execute on each timeline on the JoinSet, join after.
- let per_timeline = |timeline_id: TimelineId, timeline: Arc| {
- async move {
- debug_assert_current_span_has_tenant_and_timeline_id();
-
- match timeline.freeze_and_flush().await {
- Ok(()) => {}
- Err(e) => {
- warn!("failed to freeze and flush: {e:#}");
- return;
- }
- }
-
- let res = if let Some(client) = timeline.remote_client.as_ref() {
- // if we did not wait for completion here, it might be our shutdown process
- // didn't wait for remote uploads to complete at all, as new tasks can forever
- // be spawned.
- //
- // what is problematic is the shutting down of RemoteTimelineClient, because
- // obviously it does not make sense to stop while we wait for it, but what
- // about corner cases like s3 suddenly hanging up?
- client.wait_completion().await
- } else {
- Ok(())
- };
-
- if let Err(e) = res {
- warn!("failed to await for frozen and flushed uploads: {e:#}");
- }
- }
- .instrument(tracing::info_span!("freeze_and_flush_on_shutdown", %timeline_id))
- };
-
- {
- let timelines = self.timelines.lock().unwrap();
- timelines
- .iter()
- .map(|(id, tl)| (*id, Arc::clone(tl)))
- .for_each(|(timeline_id, timeline)| {
- js.spawn(per_timeline(timeline_id, timeline));
- })
- };
-
- while let Some(res) = js.join_next().await {
- match res {
- Ok(()) => {}
- Err(je) if je.is_cancelled() => unreachable!("no cancelling used"),
- Err(je) if je.is_panic() => { /* logged already */ }
- Err(je) => warn!("unexpected JoinError: {je:?}"),
- }
- }
- }
-
pub fn current_state(&self) -> TenantState {
self.state.borrow().clone()
}
@@ -1882,19 +1824,22 @@ impl Tenant {
}
};
- if freeze_and_flush {
- // walreceiver has already began to shutdown with TenantState::Stopping, but we need to
- // await for them to stop.
- task_mgr::shutdown_tasks(
- Some(TaskKind::WalReceiverManager),
- Some(self.tenant_id),
- None,
- )
- .await;
-
- // this will wait for uploads to complete; in the past, it was done outside tenant
- // shutdown in pageserver::shutdown_pageserver.
- self.freeze_and_flush_on_shutdown().await;
+ let mut js = tokio::task::JoinSet::new();
+ {
+ let timelines = self.timelines.lock().unwrap();
+ timelines.values().for_each(|timeline| {
+ let timeline = Arc::clone(timeline);
+ let span = Span::current();
+ js.spawn(async move { timeline.shutdown(freeze_and_flush).instrument(span).await });
+ })
+ };
+ while let Some(res) = js.join_next().await {
+ match res {
+ Ok(()) => {}
+ Err(je) if je.is_cancelled() => unreachable!("no cancelling used"),
+ Err(je) if je.is_panic() => { /* logged already */ }
+ Err(je) => warn!("unexpected JoinError: {je:?}"),
+ }
}
// shutdown all tenant and timeline tasks: gc, compaction, page service
@@ -2421,72 +2366,37 @@ impl Tenant {
Ok(tenant_conf)
}
- pub(super) fn persist_tenant_config(
+ #[tracing::instrument(skip_all, fields(%tenant_id))]
+ pub(super) async fn persist_tenant_config(
tenant_id: &TenantId,
target_config_path: &Path,
tenant_conf: TenantConfOpt,
- creating_tenant: bool,
) -> anyhow::Result<()> {
- let _enter = info_span!("saving tenantconf").entered();
-
// imitate a try-block with a closure
- let do_persist = |target_config_path: &Path| -> anyhow::Result<()> {
- let target_config_parent = target_config_path.parent().with_context(|| {
- format!(
- "Config path does not have a parent: {}",
- target_config_path.display()
- )
- })?;
+ info!("persisting tenantconf to {}", target_config_path.display());
- info!("persisting tenantconf to {}", target_config_path.display());
-
- let mut conf_content = r#"# This file contains a specific per-tenant's config.
+ let mut conf_content = r#"# This file contains a specific per-tenant's config.
# It is read in case of pageserver restart.
[tenant_config]
"#
- .to_string();
+ .to_string();
- // Convert the config to a toml file.
- conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
+ // Convert the config to a toml file.
+ conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
- let mut target_config_file = VirtualFile::open_with_options(
- target_config_path,
- OpenOptions::new()
- .truncate(true) // This needed for overwriting with small config files
- .write(true)
- .create_new(creating_tenant)
- // when creating a new tenant, first_save will be true and `.create(true)` will be
- // ignored (per rust std docs).
- //
- // later when updating the config of created tenant, or persisting config for the
- // first time for attached tenant, the `.create(true)` is used.
- .create(true),
- )?;
+ let conf_content = conf_content.as_bytes();
- target_config_file
- .write(conf_content.as_bytes())
- .context("write toml bytes into file")
- .and_then(|_| target_config_file.sync_all().context("fsync config file"))
- .context("write config file")?;
-
- // fsync the parent directory to ensure the directory entry is durable.
- // before this was done conditionally on creating_tenant, but these management actions are rare
- // enough to just fsync it always.
-
- crashsafe::fsync(target_config_parent)?;
- // XXX we're not fsyncing the parent dir, need to do that in case `creating_tenant`
- Ok(())
- };
-
- // this function is called from creating the tenant and updating the tenant config, which
- // would otherwise share this context, so keep it here in one place.
- do_persist(target_config_path).with_context(|| {
- format!(
- "write tenant {tenant_id} config to {}",
- target_config_path.display()
- )
- })
+ let temp_path = path_with_suffix_extension(target_config_path, TEMP_FILE_SUFFIX);
+ VirtualFile::crashsafe_overwrite(target_config_path, &temp_path, conf_content)
+ .await
+ .with_context(|| {
+ format!(
+ "write tenant {tenant_id} config to {}",
+ target_config_path.display()
+ )
+ })?;
+ Ok(())
}
//
@@ -2797,13 +2707,15 @@ impl Tenant {
src_timeline.pg_version,
);
- let uninitialized_timeline = self.prepare_new_timeline(
- dst_id,
- &metadata,
- timeline_uninit_mark,
- start_lsn + 1,
- Some(Arc::clone(src_timeline)),
- )?;
+ let uninitialized_timeline = self
+ .prepare_new_timeline(
+ dst_id,
+ &metadata,
+ timeline_uninit_mark,
+ start_lsn + 1,
+ Some(Arc::clone(src_timeline)),
+ )
+ .await?;
let new_timeline = uninitialized_timeline.finish_creation()?;
@@ -2881,13 +2793,15 @@ impl Tenant {
pgdata_lsn,
pg_version,
);
- let raw_timeline = self.prepare_new_timeline(
- timeline_id,
- &new_metadata,
- timeline_uninit_mark,
- pgdata_lsn,
- None,
- )?;
+ let raw_timeline = self
+ .prepare_new_timeline(
+ timeline_id,
+ &new_metadata,
+ timeline_uninit_mark,
+ pgdata_lsn,
+ None,
+ )
+ .await?;
let tenant_id = raw_timeline.owning_tenant.tenant_id;
let unfinished_timeline = raw_timeline.raw_timeline()?;
@@ -2958,7 +2872,7 @@ impl Tenant {
/// at 'disk_consistent_lsn'. After any initial data has been imported, call
/// `finish_creation` to insert the Timeline into the timelines map and to remove the
/// uninit mark file.
- fn prepare_new_timeline(
+ async fn prepare_new_timeline(
&self,
new_timeline_id: TimelineId,
new_metadata: &TimelineMetadata,
@@ -2986,8 +2900,9 @@ impl Tenant {
timeline_struct.init_empty_layer_map(start_lsn);
- if let Err(e) =
- self.create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
+ if let Err(e) = self
+ .create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
+ .await
{
error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
cleanup_timeline_directory(uninit_mark);
@@ -3003,7 +2918,7 @@ impl Tenant {
))
}
- fn create_timeline_files(
+ async fn create_timeline_files(
&self,
timeline_path: &Path,
new_timeline_id: &TimelineId,
@@ -3015,14 +2930,9 @@ impl Tenant {
anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
});
- save_metadata(
- self.conf,
- &self.tenant_id,
- new_timeline_id,
- new_metadata,
- true,
- )
- .context("Failed to create timeline metadata")?;
+ save_metadata(self.conf, &self.tenant_id, new_timeline_id, new_metadata)
+ .await
+ .context("Failed to create timeline metadata")?;
Ok(())
}
@@ -3169,7 +3079,7 @@ pub(crate) enum CreateTenantFilesMode {
Attach,
}
-pub(crate) fn create_tenant_files(
+pub(crate) async fn create_tenant_files(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: &TenantId,
@@ -3205,7 +3115,8 @@ pub(crate) fn create_tenant_files(
mode,
&temporary_tenant_dir,
&target_tenant_directory,
- );
+ )
+ .await;
if creation_result.is_err() {
error!("Failed to create directory structure for tenant {tenant_id}, cleaning tmp data");
@@ -3223,7 +3134,7 @@ pub(crate) fn create_tenant_files(
Ok(target_tenant_directory)
}
-fn try_create_target_tenant_dir(
+async fn try_create_target_tenant_dir(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: &TenantId,
@@ -3262,7 +3173,7 @@ fn try_create_target_tenant_dir(
)
.with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?;
- Tenant::persist_tenant_config(tenant_id, &temporary_tenant_config_path, tenant_conf, true)?;
+ Tenant::persist_tenant_config(tenant_id, &temporary_tenant_config_path, tenant_conf).await?;
crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
format!(
@@ -3467,6 +3378,8 @@ pub mod harness {
pub tenant_conf: TenantConf,
pub tenant_id: TenantId,
pub generation: Generation,
+ remote_storage: GenericRemoteStorage,
+ pub remote_fs_dir: PathBuf,
}
static LOG_HANDLE: OnceCell<()> = OnceCell::new();
@@ -3504,29 +3417,39 @@ pub mod harness {
fs::create_dir_all(conf.tenant_path(&tenant_id))?;
fs::create_dir_all(conf.timelines_path(&tenant_id))?;
+ use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
+ let remote_fs_dir = conf.workdir.join("localfs");
+ std::fs::create_dir_all(&remote_fs_dir).unwrap();
+ let config = RemoteStorageConfig {
+ // TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
+ max_concurrent_syncs: std::num::NonZeroUsize::new(2_000_000).unwrap(),
+ // TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
+ max_sync_errors: std::num::NonZeroU32::new(3_000_000).unwrap(),
+ storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
+ };
+ let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
+
Ok(Self {
conf,
tenant_conf,
tenant_id,
generation: Generation::new(0xdeadbeef),
+ remote_storage,
+ remote_fs_dir,
})
}
pub async fn load(&self) -> (Arc, RequestContext) {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
(
- self.try_load(&ctx, None)
+ self.try_load(&ctx)
.await
.expect("failed to load test tenant"),
ctx,
)
}
- pub async fn try_load(
- &self,
- ctx: &RequestContext,
- remote_storage: Option,
- ) -> anyhow::Result> {
+ pub async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result> {
let walredo_mgr = Arc::new(TestRedoManager);
let tenant = Arc::new(Tenant::new(
@@ -3536,7 +3459,7 @@ pub mod harness {
walredo_mgr,
self.tenant_id,
self.generation,
- remote_storage,
+ Some(self.remote_storage.clone()),
));
tenant
.load(None, ctx)
@@ -3649,7 +3572,10 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
- match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) {
+ match tenant
+ .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
+ .await
+ {
Ok(_) => panic!("duplicate timeline creation should fail"),
Err(e) => assert_eq!(
e.to_string(),
@@ -4004,6 +3930,13 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)
.await?;
make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
+ // so that all uploads finish & we can call harness.load() below again
+ tenant
+ .shutdown(Default::default(), true)
+ .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
+ .await
+ .ok()
+ .unwrap();
}
let (tenant, _ctx) = harness.load().await;
@@ -4037,6 +3970,14 @@ mod tests {
.expect("Should have a local timeline");
make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
+
+ // so that all uploads finish & we can call harness.load() below again
+ tenant
+ .shutdown(Default::default(), true)
+ .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
+ .await
+ .ok()
+ .unwrap();
}
// check that both of them are initially unloaded
@@ -4089,6 +4030,13 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
drop(tline);
+ // so that all uploads finish & we can call harness.try_load() below again
+ tenant
+ .shutdown(Default::default(), true)
+ .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
+ .await
+ .ok()
+ .unwrap();
drop(tenant);
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
@@ -4100,11 +4048,7 @@ mod tests {
metadata_bytes[8] ^= 1;
std::fs::write(metadata_path, metadata_bytes)?;
- let err = harness
- .try_load(&ctx, None)
- .await
- .err()
- .expect("should fail");
+ let err = harness.try_load(&ctx).await.err().expect("should fail");
// get all the stack with all .context, not only the last one
let message = format!("{err:#}");
let expected = "failed to load metadata";
@@ -4489,8 +4433,9 @@ mod tests {
.await;
let initdb_lsn = Lsn(0x20);
- let utline =
- tenant.create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)?;
+ let utline = tenant
+ .create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
+ .await?;
let tline = utline.raw_timeline().unwrap();
// Spawn flush loop now so that we can set the `expect_initdb_optimization`
@@ -4555,9 +4500,15 @@ mod tests {
let harness = TenantHarness::create(name)?;
{
let (tenant, ctx) = harness.load().await;
- let tline =
- tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
+ let tline = tenant
+ .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
+ .await?;
// Keeps uninit mark in place
+ let raw_tline = tline.raw_timeline().unwrap();
+ raw_tline
+ .shutdown(false)
+ .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_id))
+ .await;
std::mem::forget(tline);
}
diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs
index f5ff15b50c..e4dede2c30 100644
--- a/pageserver/src/tenant/blob_io.rs
+++ b/pageserver/src/tenant/blob_io.rs
@@ -96,18 +96,12 @@ pub trait BlobWriter {
/// An implementation of BlobWriter to write blobs to anything that
/// implements std::io::Write.
///
-pub struct WriteBlobWriter
-where
- W: std::io::Write,
-{
+pub struct WriteBlobWriter {
inner: W,
offset: u64,
}
-impl WriteBlobWriter
-where
- W: std::io::Write,
-{
+impl WriteBlobWriter {
pub fn new(inner: W, start_offset: u64) -> Self {
WriteBlobWriter {
inner,
diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs
index 69d5b49c6d..645ec81036 100644
--- a/pageserver/src/tenant/block_io.rs
+++ b/pageserver/src/tenant/block_io.rs
@@ -7,9 +7,7 @@ use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
-use std::fs::File;
use std::ops::{Deref, DerefMut};
-use std::os::unix::fs::FileExt;
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache
@@ -73,8 +71,7 @@ impl<'a> Deref for BlockLease<'a> {
///
/// Unlike traits, we also support the read function to be async though.
pub(crate) enum BlockReaderRef<'a> {
- FileBlockReaderVirtual(&'a FileBlockReader),
- FileBlockReaderFile(&'a FileBlockReader),
+ FileBlockReaderVirtual(&'a FileBlockReader),
EphemeralFile(&'a EphemeralFile),
Adapter(Adapter<&'a DeltaLayerInner>),
#[cfg(test)]
@@ -87,7 +84,6 @@ impl<'a> BlockReaderRef<'a> {
use BlockReaderRef::*;
match self {
FileBlockReaderVirtual(r) => r.read_blk(blknum).await,
- FileBlockReaderFile(r) => r.read_blk(blknum).await,
EphemeralFile(r) => r.read_blk(blknum).await,
Adapter(r) => r.read_blk(blknum).await,
#[cfg(test)]
@@ -105,7 +101,7 @@ impl<'a> BlockReaderRef<'a> {
///
/// ```no_run
/// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
-/// # let reader: FileBlockReader = unimplemented!("stub");
+/// # let reader: FileBlockReader = unimplemented!("stub");
/// let cursor = reader.block_cursor();
/// let buf = cursor.read_blk(1);
/// // do stuff with 'buf'
@@ -122,7 +118,7 @@ impl<'a> BlockCursor<'a> {
BlockCursor { reader }
}
// Needed by cli
- pub fn new_fileblockreader_virtual(reader: &'a FileBlockReader) -> Self {
+ pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
BlockCursor {
reader: BlockReaderRef::FileBlockReaderVirtual(reader),
}
@@ -143,27 +139,26 @@ impl<'a> BlockCursor<'a> {
///
/// The file is assumed to be immutable. This doesn't provide any functions
/// for modifying the file, nor for invalidating the cache if it is modified.
-pub struct FileBlockReader {
- pub file: F,
+pub struct FileBlockReader {
+ pub file: VirtualFile,
/// Unique ID of this file, used as key in the page cache.
file_id: page_cache::FileId,
}
-impl FileBlockReader
-where
- F: FileExt,
-{
- pub fn new(file: F) -> Self {
+impl FileBlockReader {
+ pub fn new(file: VirtualFile) -> Self {
let file_id = page_cache::next_file_id();
FileBlockReader { file_id, file }
}
/// Read a page from the underlying file into given buffer.
- fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
+ async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
assert!(buf.len() == PAGE_SZ);
- self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
+ self.file
+ .read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
+ .await
}
/// Read a block.
///
@@ -185,7 +180,7 @@ where
ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
- self.fill_buffer(write_guard.deref_mut(), blknum)?;
+ self.fill_buffer(write_guard.deref_mut(), blknum).await?;
write_guard.mark_valid();
// Swap for read lock
@@ -196,13 +191,7 @@ where
}
}
-impl BlockReader for FileBlockReader {
- fn block_cursor(&self) -> BlockCursor<'_> {
- BlockCursor::new(BlockReaderRef::FileBlockReaderFile(self))
- }
-}
-
-impl BlockReader for FileBlockReader {
+impl BlockReader for FileBlockReader {
fn block_cursor(&self) -> BlockCursor<'_> {
BlockCursor::new(BlockReaderRef::FileBlockReaderVirtual(self))
}
diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs
index 31db3869d9..4c5fe424f3 100644
--- a/pageserver/src/tenant/ephemeral_file.rs
+++ b/pageserver/src/tenant/ephemeral_file.rs
@@ -9,7 +9,6 @@ use std::cmp::min;
use std::fs::OpenOptions;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
-use std::os::unix::prelude::FileExt;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use tracing::*;
@@ -88,7 +87,8 @@ impl EphemeralFile {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
- .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?;
+ .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
+ .await?;
write_guard.mark_valid();
// Swap for read lock
@@ -128,10 +128,15 @@ impl EphemeralFile {
self.off += n;
src_remaining = &src_remaining[n..];
if self.off == PAGE_SZ {
- match self.ephemeral_file.file.write_all_at(
- &self.ephemeral_file.mutable_tail,
- self.blknum as u64 * PAGE_SZ as u64,
- ) {
+ match self
+ .ephemeral_file
+ .file
+ .write_all_at(
+ &self.ephemeral_file.mutable_tail,
+ self.blknum as u64 * PAGE_SZ as u64,
+ )
+ .await
+ {
Ok(_) => {
// Pre-warm the page cache with what we just wrote.
// This isn't necessary for coherency/correctness, but it's how we've always done it.
diff --git a/pageserver/src/tenant/manifest.rs b/pageserver/src/tenant/manifest.rs
deleted file mode 100644
index 1d2835114f..0000000000
--- a/pageserver/src/tenant/manifest.rs
+++ /dev/null
@@ -1,325 +0,0 @@
-//! This module contains the encoding and decoding of the local manifest file.
-//!
-//! MANIFEST is a write-ahead log which is stored locally to each timeline. It
-//! records the state of the storage engine. It contains a snapshot of the
-//! state and all operations proceeding that snapshot. The file begins with a
-//! header recording MANIFEST version number. After that, it contains a snapshot.
-//! The snapshot is followed by a list of operations. Each operation is a list
-//! of records. Each record is either an addition or a removal of a layer.
-//!
-//! With MANIFEST, we can:
-//!
-//! 1. recover state quickly by reading the file, potentially boosting the
-//! startup speed.
-//! 2. ensure all operations are atomic and avoid corruption, solving issues
-//! like redundant image layer and preparing us for future compaction
-//! strategies.
-//!
-//! There is also a format for storing all layer files on S3, called
-//! `index_part.json`. Compared with index_part, MANIFEST is an WAL which
-//! records all operations as logs, and therefore we can easily replay the
-//! operations when recovering from crash, while ensuring those operations
-//! are atomic upon restart.
-//!
-//! Currently, this is not used in the system. Future refactors will ensure
-//! the storage state will be recorded in this file, and the system can be
-//! recovered from this file. This is tracked in
-//!
-
-use std::io::{self, Read, Write};
-
-use crate::virtual_file::VirtualFile;
-use anyhow::Result;
-use bytes::{Buf, BufMut, Bytes, BytesMut};
-use crc32c::crc32c;
-use serde::{Deserialize, Serialize};
-use tracing::log::warn;
-use utils::lsn::Lsn;
-
-use super::storage_layer::PersistentLayerDesc;
-
-pub struct Manifest {
- file: VirtualFile,
-}
-
-#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
-pub struct Snapshot {
- pub layers: Vec,
-}
-
-/// serde by default encode this in tagged enum, and therefore it will be something
-/// like `{ "AddLayer": { ... } }`.
-#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
-pub enum Record {
- AddLayer(PersistentLayerDesc),
- RemoveLayer(PersistentLayerDesc),
-}
-
-/// `echo neon.manifest | sha1sum` and take the leading 8 bytes.
-const MANIFEST_MAGIC_NUMBER: u64 = 0xf5c44592b806109c;
-const MANIFEST_VERSION: u64 = 1;
-
-#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
-pub struct ManifestHeader {
- magic_number: u64,
- version: u64,
-}
-
-const MANIFEST_HEADER_LEN: usize = 16;
-
-impl ManifestHeader {
- fn encode(&self) -> BytesMut {
- let mut buf = BytesMut::with_capacity(MANIFEST_HEADER_LEN);
- buf.put_u64(self.magic_number);
- buf.put_u64(self.version);
- buf
- }
-
- fn decode(mut buf: &[u8]) -> Self {
- assert!(buf.len() == MANIFEST_HEADER_LEN, "invalid header");
- Self {
- magic_number: buf.get_u64(),
- version: buf.get_u64(),
- }
- }
-}
-
-#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
-pub enum Operation {
- /// A snapshot of the current state.
- ///
- /// Lsn field represents the LSN that is persisted to disk for this snapshot.
- Snapshot(Snapshot, Lsn),
- /// An atomic operation that changes the state.
- ///
- /// Lsn field represents the LSN that is persisted to disk after the operation is done.
- /// This will only change when new L0 is flushed to the disk.
- Operation(Vec, Lsn),
-}
-
-struct RecordHeader {
- size: u32,
- checksum: u32,
-}
-
-const RECORD_HEADER_LEN: usize = 8;
-
-impl RecordHeader {
- fn encode(&self) -> BytesMut {
- let mut buf = BytesMut::with_capacity(RECORD_HEADER_LEN);
- buf.put_u32(self.size);
- buf.put_u32(self.checksum);
- buf
- }
-
- fn decode(mut buf: &[u8]) -> Self {
- assert!(buf.len() == RECORD_HEADER_LEN, "invalid header");
- Self {
- size: buf.get_u32(),
- checksum: buf.get_u32(),
- }
- }
-}
-
-#[derive(Debug, thiserror::Error)]
-pub enum ManifestLoadError {
- #[error("manifest header is corrupted")]
- CorruptedManifestHeader,
- #[error("unsupported manifest version: got {0}, expected {1}")]
- UnsupportedVersion(u64, u64),
- #[error("error when decoding record: {0}")]
- DecodeRecord(serde_json::Error),
- #[error("I/O error: {0}")]
- Io(io::Error),
-}
-
-#[must_use = "Should check if the manifest is partially corrupted"]
-pub struct ManifestPartiallyCorrupted(bool);
-
-impl Manifest {
- /// Create a new manifest by writing the manifest header and a snapshot record to the given file.
- pub fn init(file: VirtualFile, snapshot: Snapshot, lsn: Lsn) -> Result {
- let mut manifest = Self { file };
- manifest.append_manifest_header(ManifestHeader {
- magic_number: MANIFEST_MAGIC_NUMBER,
- version: MANIFEST_VERSION,
- })?;
- manifest.append_operation(Operation::Snapshot(snapshot, lsn))?;
- Ok(manifest)
- }
-
- /// Load a manifest. Returns the manifest and a list of operations. If the manifest is corrupted,
- /// the bool flag will be set to true and the user is responsible to reconstruct a new manifest and
- /// backup the current one.
- pub fn load(
- mut file: VirtualFile,
- ) -> Result<(Self, Vec, ManifestPartiallyCorrupted), ManifestLoadError> {
- let mut buf = vec![];
- file.read_to_end(&mut buf).map_err(ManifestLoadError::Io)?;
-
- // Read manifest header
- let mut buf = Bytes::from(buf);
- if buf.remaining() < MANIFEST_HEADER_LEN {
- return Err(ManifestLoadError::CorruptedManifestHeader);
- }
- let header = ManifestHeader::decode(&buf[..MANIFEST_HEADER_LEN]);
- buf.advance(MANIFEST_HEADER_LEN);
- if header.version != MANIFEST_VERSION {
- return Err(ManifestLoadError::UnsupportedVersion(
- header.version,
- MANIFEST_VERSION,
- ));
- }
-
- // Read operations
- let mut operations = Vec::new();
- let corrupted = loop {
- if buf.remaining() == 0 {
- break false;
- }
- if buf.remaining() < RECORD_HEADER_LEN {
- warn!("incomplete header when decoding manifest, could be corrupted");
- break true;
- }
- let RecordHeader { size, checksum } = RecordHeader::decode(&buf[..RECORD_HEADER_LEN]);
- let size = size as usize;
- buf.advance(RECORD_HEADER_LEN);
- if buf.remaining() < size {
- warn!("incomplete data when decoding manifest, could be corrupted");
- break true;
- }
- let data = &buf[..size];
- if crc32c(data) != checksum {
- warn!("checksum mismatch when decoding manifest, could be corrupted");
- break true;
- }
- // if the following decode fails, we cannot use the manifest or safely ignore any record.
- operations.push(serde_json::from_slice(data).map_err(ManifestLoadError::DecodeRecord)?);
- buf.advance(size);
- };
- Ok((
- Self { file },
- operations,
- ManifestPartiallyCorrupted(corrupted),
- ))
- }
-
- fn append_data(&mut self, data: &[u8]) -> Result<()> {
- if data.len() >= u32::MAX as usize {
- panic!("data too large");
- }
- let header = RecordHeader {
- size: data.len() as u32,
- checksum: crc32c(data),
- };
- let header = header.encode();
- self.file.write_all(&header)?;
- self.file.write_all(data)?;
- self.file.sync_all()?;
- Ok(())
- }
-
- fn append_manifest_header(&mut self, header: ManifestHeader) -> Result<()> {
- let encoded = header.encode();
- self.file.write_all(&encoded)?;
- Ok(())
- }
-
- /// Add an operation to the manifest. The operation will be appended to the end of the file,
- /// and the file will fsync.
- pub fn append_operation(&mut self, operation: Operation) -> Result<()> {
- let encoded = Vec::from(serde_json::to_string(&operation)?);
- self.append_data(&encoded)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use std::fs::OpenOptions;
-
- use crate::repository::Key;
-
- use super::*;
-
- #[test]
- fn test_read_manifest() {
- let testdir = crate::config::PageServerConf::test_repo_dir("test_read_manifest");
- std::fs::create_dir_all(&testdir).unwrap();
- let file = VirtualFile::create(&testdir.join("MANIFEST")).unwrap();
- let layer1 = PersistentLayerDesc::new_test(Key::from_i128(0)..Key::from_i128(233));
- let layer2 = PersistentLayerDesc::new_test(Key::from_i128(233)..Key::from_i128(2333));
- let layer3 = PersistentLayerDesc::new_test(Key::from_i128(2333)..Key::from_i128(23333));
- let layer4 = PersistentLayerDesc::new_test(Key::from_i128(23333)..Key::from_i128(233333));
-
- // Write a manifest with a snapshot and some operations
- let snapshot = Snapshot {
- layers: vec![layer1, layer2],
- };
- let mut manifest = Manifest::init(file, snapshot.clone(), Lsn::from(0)).unwrap();
- manifest
- .append_operation(Operation::Operation(
- vec![Record::AddLayer(layer3.clone())],
- Lsn::from(1),
- ))
- .unwrap();
- drop(manifest);
-
- // Open the second time and write
- let file = VirtualFile::open_with_options(
- &testdir.join("MANIFEST"),
- OpenOptions::new()
- .read(true)
- .write(true)
- .create_new(false)
- .truncate(false),
- )
- .unwrap();
- let (mut manifest, operations, corrupted) = Manifest::load(file).unwrap();
- assert!(!corrupted.0);
- assert_eq!(operations.len(), 2);
- assert_eq!(
- &operations[0],
- &Operation::Snapshot(snapshot.clone(), Lsn::from(0))
- );
- assert_eq!(
- &operations[1],
- &Operation::Operation(vec![Record::AddLayer(layer3.clone())], Lsn::from(1))
- );
- manifest
- .append_operation(Operation::Operation(
- vec![
- Record::RemoveLayer(layer3.clone()),
- Record::AddLayer(layer4.clone()),
- ],
- Lsn::from(2),
- ))
- .unwrap();
- drop(manifest);
-
- // Open the third time and verify
- let file = VirtualFile::open_with_options(
- &testdir.join("MANIFEST"),
- OpenOptions::new()
- .read(true)
- .write(true)
- .create_new(false)
- .truncate(false),
- )
- .unwrap();
- let (_manifest, operations, corrupted) = Manifest::load(file).unwrap();
- assert!(!corrupted.0);
- assert_eq!(operations.len(), 3);
- assert_eq!(&operations[0], &Operation::Snapshot(snapshot, Lsn::from(0)));
- assert_eq!(
- &operations[1],
- &Operation::Operation(vec![Record::AddLayer(layer3.clone())], Lsn::from(1))
- );
- assert_eq!(
- &operations[2],
- &Operation::Operation(
- vec![Record::RemoveLayer(layer3), Record::AddLayer(layer4)],
- Lsn::from(2)
- )
- );
- }
-}
diff --git a/pageserver/src/tenant/metadata.rs b/pageserver/src/tenant/metadata.rs
index dbf2d5ac37..7b05704e4f 100644
--- a/pageserver/src/tenant/metadata.rs
+++ b/pageserver/src/tenant/metadata.rs
@@ -8,14 +8,13 @@
//!
//! [`remote_timeline_client`]: super::remote_timeline_client
-use std::fs::{File, OpenOptions};
-use std::io::{self, Write};
+use std::io::{self};
-use anyhow::{bail, ensure, Context};
+use anyhow::{ensure, Context};
use serde::{de::Error, Deserialize, Serialize, Serializer};
use thiserror::Error;
-use tracing::info_span;
use utils::bin_ser::SerializeError;
+use utils::crashsafe::path_with_suffix_extension;
use utils::{
bin_ser::BeSer,
id::{TenantId, TimelineId},
@@ -24,6 +23,7 @@ use utils::{
use crate::config::PageServerConf;
use crate::virtual_file::VirtualFile;
+use crate::TEMP_FILE_SUFFIX;
/// Use special format number to enable backward compatibility.
const METADATA_FORMAT_VERSION: u16 = 4;
@@ -255,38 +255,19 @@ impl Serialize for TimelineMetadata {
}
/// Save timeline metadata to file
-pub fn save_metadata(
+#[tracing::instrument(skip_all, fields(%tenant_id, %timeline_id))]
+pub async fn save_metadata(
conf: &'static PageServerConf,
tenant_id: &TenantId,
timeline_id: &TimelineId,
data: &TimelineMetadata,
- first_save: bool,
) -> anyhow::Result<()> {
- let _enter = info_span!("saving metadata").entered();
let path = conf.metadata_path(tenant_id, timeline_id);
- // use OpenOptions to ensure file presence is consistent with first_save
- let mut file = VirtualFile::open_with_options(
- &path,
- OpenOptions::new().write(true).create_new(first_save),
- )
- .context("open_with_options")?;
-
- let metadata_bytes = data.to_bytes().context("Failed to get metadata bytes")?;
-
- if file.write(&metadata_bytes)? != metadata_bytes.len() {
- bail!("Could not write all the metadata bytes in a single call");
- }
- file.sync_all()?;
-
- // fsync the parent directory to ensure the directory entry is durable
- if first_save {
- let timeline_dir = File::open(
- path.parent()
- .expect("Metadata should always have a parent dir"),
- )?;
- timeline_dir.sync_all()?;
- }
-
+ let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
+ let metadata_bytes = data.to_bytes().context("serialize metadata")?;
+ VirtualFile::crashsafe_overwrite(&path, &temp_path, &metadata_bytes)
+ .await
+ .context("write metadata")?;
Ok(())
}
diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs
index 87617b544c..72d150e0eb 100644
--- a/pageserver/src/tenant/mgr.rs
+++ b/pageserver/src/tenant/mgr.rs
@@ -22,8 +22,9 @@ use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
-use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME};
+use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
+use utils::crashsafe::path_with_suffix_extension;
use utils::fs_ext::PathExt;
use utils::generation::Generation;
use utils::id::{TenantId, TimelineId};
@@ -60,6 +61,29 @@ impl TenantsMap {
}
}
+/// This is "safe" in that that it won't leave behind a partially deleted directory
+/// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
+/// the contents.
+///
+/// This is pageserver-specific, as it relies on future processes after a crash to check
+/// for TEMP_FILE_SUFFIX when loading things.
+async fn safe_remove_tenant_dir_all(path: impl AsRef) -> std::io::Result<()> {
+ let parent = path
+ .as_ref()
+ .parent()
+ // It is invalid to call this function with a relative path. Tenant directories
+ // should always have a parent.
+ .ok_or(std::io::Error::new(
+ std::io::ErrorKind::InvalidInput,
+ "Path must be absolute",
+ ))?;
+
+ let tmp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
+ fs::rename(&path, &tmp_path).await?;
+ fs::File::open(parent).await?.sync_all().await?;
+ fs::remove_dir_all(tmp_path).await
+}
+
static TENANTS: Lazy> = Lazy::new(|| RwLock::new(TenantsMap::Initializing));
/// Initialize repositories with locally available timelines.
@@ -92,6 +116,8 @@ pub async fn init_tenant_mgr(
"Found temporary tenant directory, removing: {}",
tenant_dir_path.display()
);
+ // No need to use safe_remove_tenant_dir_all because this is already
+ // a temporary path
if let Err(e) = fs::remove_dir_all(&tenant_dir_path).await {
error!(
"Failed to remove temporary directory '{}': {:?}",
@@ -361,11 +387,11 @@ pub async fn create_tenant(
remote_storage: Option,
ctx: &RequestContext,
) -> Result, TenantMapInsertError> {
- tenant_map_insert(tenant_id, || {
+ tenant_map_insert(tenant_id, || async {
// We're holding the tenants lock in write mode while doing local IO.
// If this section ever becomes contentious, introduce a new `TenantState::Creating`
// and do the work in that state.
- let tenant_directory = super::create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Create)?;
+ let tenant_directory = super::create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Create).await?;
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
@@ -404,7 +430,8 @@ pub async fn set_new_tenant_config(
let tenant = get_tenant(tenant_id, true).await?;
let tenant_config_path = conf.tenant_config_path(&tenant_id);
- Tenant::persist_tenant_config(&tenant_id, &tenant_config_path, new_tenant_conf, false)
+ Tenant::persist_tenant_config(&tenant_id, &tenant_config_path, new_tenant_conf)
+ .await
.map_err(SetNewTenantConfigError::Persist)?;
tenant.set_new_tenant_config(new_tenant_conf);
Ok(())
@@ -490,7 +517,7 @@ async fn detach_tenant0(
) -> Result<(), TenantStateError> {
let local_files_cleanup_operation = |tenant_id_to_clean| async move {
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
- fs::remove_dir_all(&local_tenant_directory)
+ safe_remove_tenant_dir_all(&local_tenant_directory)
.await
.with_context(|| {
format!("local tenant directory {local_tenant_directory:?} removal")
@@ -525,7 +552,7 @@ pub async fn load_tenant(
remote_storage: Option,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
- tenant_map_insert(tenant_id, || {
+ tenant_map_insert(tenant_id, || async {
let tenant_path = conf.tenant_path(&tenant_id);
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
if tenant_ignore_mark.exists() {
@@ -606,8 +633,8 @@ pub async fn attach_tenant(
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
- tenant_map_insert(tenant_id, || {
- let tenant_dir = create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Attach)?;
+ tenant_map_insert(tenant_id, || async {
+ let tenant_dir = create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Attach).await?;
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
@@ -655,12 +682,13 @@ pub enum TenantMapInsertError {
///
/// NB: the closure should return quickly because the current implementation of tenants map
/// serializes access through an `RwLock`.
-async fn tenant_map_insert(
+async fn tenant_map_insert(
tenant_id: TenantId,
insert_fn: F,
) -> Result, TenantMapInsertError>
where
- F: FnOnce() -> anyhow::Result>,
+ F: FnOnce() -> R,
+ R: std::future::Future