Compare commits

...

166 Commits

Author SHA1 Message Date
Joonas Koivunen
396e6a4b62 test: rename previous test, cleanup, still does not work 2023-08-30 14:19:54 +03:00
Joonas Koivunen
3295d6245a fix: provide better context for the other test 2023-08-30 14:19:54 +03:00
Joonas Koivunen
a4bfdd8013 test: actually duplicate L1 layer in test 2023-08-30 14:19:54 +03:00
Joonas Koivunen
cd869e5737 handle duplicate l1s almost safely
there is still the possibility of the old layer not really being
readable or returning bogus values if the internal representation has
shifted. cannot do anything for that... unless we rename them away, or
something like that.
2023-08-30 14:19:54 +03:00
Joonas Koivunen
544136f13f fix: require ResidentLayer to keep resident
it only matters when calling this, it can get "evicted" after, even
though, it's file will probably not be readable.
2023-08-30 10:31:56 +03:00
Joonas Koivunen
1cbb7fccaa allow keeping a duplicate resident
except that the residentness is not enforced right now.
2023-08-30 10:02:08 +03:00
Joonas Koivunen
9ab35cc5d2 fix: upload compacted layers 2023-08-30 09:45:37 +03:00
Joonas Koivunen
bb03c74217 refactor: rename for_written_tempfile -> finish_creating 2023-08-29 15:57:34 +03:00
Joonas Koivunen
774f34d778 refactor: move all metrics updates to layer
this fixes some missing increments for num_persistent_files_created,
persistent_bytes_written and removes double entries for residence
events.
2023-08-29 15:45:03 +03:00
Joonas Koivunen
f5a171076b doc: comment about chance of both evictions selecting same layer 2023-08-29 15:20:25 +03:00
Joonas Koivunen
881fdbc04c doc: fix broken link 2023-08-29 14:34:06 +03:00
Joonas Koivunen
8a27e58894 doc: fix outdated commit 2023-08-29 14:19:53 +03:00
Joonas Koivunen
b4c81f7dff refactor: rename LayerInner::on_drop to on_downloaded_layer_drop 2023-08-29 14:13:09 +03:00
Joonas Koivunen
9151b71b19 doc: fix link 2023-08-29 13:25:36 +03:00
Joonas Koivunen
8348bc9b1a test: allow witnessing stopping before broken 2023-08-29 12:24:39 +03:00
Joonas Koivunen
6e47438f43 fixup residentlayer comment 2023-08-29 12:16:45 +03:00
Joonas Koivunen
579e85e92d doc: assert &Arc<LayerInner> and DownloadedLayer::owner 2023-08-29 12:16:27 +03:00
Joonas Koivunen
53d2b48ea2 doc: adjust more LayerInner::on_drop 2023-08-29 12:15:46 +03:00
Joonas Koivunen
ac6604b6ed doc: adjust while in queue 2023-08-29 12:06:47 +03:00
Joonas Koivunen
91b64427ed doc: remove comment about backoff 2023-08-29 12:04:34 +03:00
Joonas Koivunen
ffe0f90083 doc: when => while 2023-08-29 11:59:24 +03:00
Joonas Koivunen
59b5a55dbf doc: create guard => new download has been started 2023-08-29 11:57:03 +03:00
Joonas Koivunen
b9290c7005 doc: simplify comment 2023-08-29 11:55:53 +03:00
Joonas Koivunen
302a58e8ea image/deltalayer: shuffle comments around 2023-08-29 11:55:08 +03:00
Joonas Koivunen
def51361ae doc: drop comment in favor of drop_eviction_guard 2023-08-29 11:14:12 +03:00
Joonas Koivunen
d67d4b3eee doc: add validation 2023-08-29 11:11:27 +03:00
Joonas Koivunen
cd1b548a8f doc: explain DownloadedLayer::get owner param 2023-08-29 11:07:14 +03:00
Joonas Koivunen
282372aa5a reorder: 1. DownloadedLayer, 2. ResidentLayer 2023-08-29 11:06:51 +03:00
Joonas Koivunen
1f0cd3b50e doc: note running without remote storage again 2023-08-29 11:06:29 +03:00
Joonas Koivunen
4973419a38 doc: cancellation safety with evict_and_wait 2023-08-29 11:06:12 +03:00
Joonas Koivunen
44ef584842 doc: residentlayer vs. downloadedlayer and eviction 2023-08-29 10:51:08 +03:00
Joonas Koivunen
55c42da91b info: stop using stat
we no longer need to use it because in the latter versions we initialize
to correct on-filesystem state with Layer::for_{resident,evicted}.
2023-08-29 10:49:41 +03:00
Joonas Koivunen
5c343af807 doc: check_expected_download 2023-08-29 10:49:25 +03:00
Joonas Koivunen
87ecb2e6ca reorder: get and get_or_apply_evictedness 2023-08-29 10:49:13 +03:00
Joonas Koivunen
c659d0f218 fix: subscribe before evicting 2023-08-29 10:48:55 +03:00
Joonas Koivunen
9f7688b1d2 doc: another pass on LayerInner 2023-08-29 10:48:36 +03:00
Joonas Koivunen
3edff352b5 doc: explain what the consecutive failures are for 2023-08-29 10:08:53 +03:00
Joonas Koivunen
08680f6591 doc: typo 2023-08-28 16:55:28 +03:00
Joonas Koivunen
55105ad1c3 refactor: Result<(), NeedsDownload> 2023-08-28 16:53:32 +03:00
Joonas Koivunen
df328758f0 refactor: simplify schedule upload and tests 2023-08-28 16:33:01 +03:00
Joonas Koivunen
d5ac61d566 doc: add cancellation safe comment 2023-08-28 14:41:43 +03:00
Joonas Koivunen
355ea43ac7 eviction_task: remove confusing drop(candidates) 2023-08-28 14:36:34 +03:00
Joonas Koivunen
6f0ab326b4 doc: inmemlayer: cleanup comments 2023-08-28 14:24:38 +03:00
Joonas Koivunen
c06a4fb511 doc: delete fixme about gentlemans agreements and strings 2023-08-28 14:19:11 +03:00
Joonas Koivunen
9a714ac6b8 doc: link to LayerMap::search 2023-08-28 14:10:49 +03:00
Joonas Koivunen
8c21edc9c5 doc: cleanup, add missing "the" 2023-08-28 14:10:49 +03:00
Joonas Koivunen
6ff324a12d doc: link to inmemorylayer 2023-08-28 14:10:49 +03:00
Joonas Koivunen
090f9a5a80 doc: remove obsolete comment 2023-08-28 14:01:56 +03:00
Joonas Koivunen
74aefa0b07 heavier_once_cell: explain away the unsynchornized 2023-08-28 13:59:08 +03:00
Joonas Koivunen
ce1abef0bd doc: fix typo 2023-08-28 13:53:57 +03:00
Joonas Koivunen
53eacacb6b botched rebase: lost impl AsRe<DeltaLayerInner> 2023-08-28 13:47:03 +03:00
Joonas Koivunen
d40b9a515a refactor: split guard_against_eviction into three
- download
- keep_resident
- download_and_keep_resident

No need to bool enum.
2023-08-28 13:35:53 +03:00
Joonas Koivunen
7c2f687bd6 rename: garbage_collect => &_on_drop 2023-08-28 13:35:53 +03:00
Joonas Koivunen
effc151244 fix: allow dropping from UploadQueue by spawn_blocking 2023-08-28 13:35:53 +03:00
Joonas Koivunen
bdfc895642 layer: remove dead comment and code 2023-08-28 13:35:53 +03:00
Joonas Koivunen
96161c8cfd restore Layer::dump 2023-08-28 13:35:47 +03:00
Joonas Koivunen
da99399d16 doc: minor fixes 2023-08-28 13:34:34 +03:00
Joonas Koivunen
7eb74d3720 test: fix allowed error typo 2023-08-28 13:34:34 +03:00
Joonas Koivunen
7b39681caf fix: delete and only then report evicted 2023-08-28 13:34:34 +03:00
Joonas Koivunen
5ff5c580ad test: fix test_timeline_deletion_with_files_stuck_in_upload_queue string change 2023-08-28 13:34:34 +03:00
Joonas Koivunen
6ccc6cbc69 refactor: minor cleanup, doc 2023-08-28 13:34:34 +03:00
Joonas Koivunen
2ecf6727c5 refactor: split evicting 2023-08-28 13:34:34 +03:00
Joonas Koivunen
f957616f1c refactor: split get_or_maybe_download 2023-08-28 13:34:34 +03:00
Joonas Koivunen
b154a5e908 doc: few touches 2023-08-28 13:34:34 +03:00
Joonas Koivunen
c66e859bcc try to apply backoff *after* download
might not work as we could get cancelled, but doing it right before
seems wrong as well. We already retry the download.
2023-08-28 13:34:34 +03:00
Joonas Koivunen
1559ef36af fix: rename the written out file in Layer ctor 2023-08-28 13:34:34 +03:00
Joonas Koivunen
ef1c3d3914 test: use guard_against_eviction from outside 2023-08-28 13:34:34 +03:00
Joonas Koivunen
83e28083b0 test: migrate to Layer::for_resident 2023-08-28 13:34:34 +03:00
Joonas Koivunen
0950f8c752 refactor: Layer initialization 2023-08-28 13:34:34 +03:00
Joonas Koivunen
41d36b65e2 move Layer and all to storage_layer::layer 2023-08-28 13:34:34 +03:00
Joonas Koivunen
d8cb81118a reorder, get rid of TODO 2023-08-28 13:34:34 +03:00
Joonas Koivunen
ecf34bb3e4 blanket rename 2023-08-28 13:34:34 +03:00
Joonas Koivunen
fb4d404553 refactor: LayerManager, remove arc 2023-08-28 13:34:34 +03:00
Joonas Koivunen
450f79b3f5 refactor: fix residency and metrics to layermanager 2023-08-28 13:34:34 +03:00
Joonas Koivunen
a47b7d1d4c LayerE::drop comments 2023-08-28 13:34:34 +03:00
Joonas Koivunen
b01022d8df drop TODO about better load time api 2023-08-28 13:34:34 +03:00
Joonas Koivunen
0155ff95e7 doc: address review comment by jcsp 2023-08-28 13:34:34 +03:00
Joonas Koivunen
46b6a1a5e8 review comment: xref tested string 2023-08-28 13:34:34 +03:00
Joonas Koivunen
290f121b59 ======= address reviews 2023-08-28 13:34:34 +03:00
Joonas Koivunen
786ddeff62 layere: comment cleanup 2023-08-28 13:34:34 +03:00
Joonas Koivunen
732e155b8e fixup remove ability to have 'static DeltaEntry 2023-08-28 13:34:32 +03:00
Joonas Koivunen
e10c5b0a9b needsdownload: remove unused 2023-08-28 12:49:56 +03:00
Joonas Koivunen
b608eaa410 layere: remove unused LayerE::new 2023-08-28 12:49:56 +03:00
Joonas Koivunen
7b2ae073f0 cleanup unused code, comments 2023-08-28 12:49:56 +03:00
Joonas Koivunen
d4a7bdad55 fix: move metric updates to finish_compact_l0 2023-08-28 12:49:56 +03:00
Joonas Koivunen
96c9fd330c fix: duplicate residency events on flushing l0 2023-08-28 12:49:56 +03:00
Joonas Koivunen
39b85cc6fd layere: record residency changes with download/evict 2023-08-28 12:49:56 +03:00
Joonas Koivunen
eccb868a50 doc: consider cancellation and redownload 2023-08-28 12:49:56 +03:00
Joonas Koivunen
de93c70f2f provide and use LayerE::for_evicted 2023-08-28 12:49:56 +03:00
Joonas Koivunen
72430eb539 layere: remove impossible error case 2023-08-28 12:49:56 +03:00
Joonas Koivunen
e2443a0147 remove ability to have 'static DeltaEntry 2023-08-28 12:49:55 +03:00
Joonas Koivunen
e5d00b6c2a fix: compaction can and should use borrowed DeltaEntrys
otherwise we risk evicting the L0 while we are reading different blobs
out of it.
2023-08-28 12:46:55 +03:00
Joonas Koivunen
dc97970215 chore(ref): make pub 2023-08-28 12:46:53 +03:00
Joonas Koivunen
bc5a643c19 fix: use new LayerE::for_resident 2023-08-28 12:43:11 +03:00
Joonas Koivunen
b1134f6857 layere: add new ctor 2023-08-28 12:43:11 +03:00
Joonas Koivunen
04ab9b78fe test: add more allowed outcomes
I cannot see a quick fix to make one them winner, nor a reason why it
should be done; at worst case there could be double accounting for some
evicted layer should the two do it at the same time.
2023-08-28 12:43:11 +03:00
Joonas Koivunen
fa0b881c4c remove previous generation of Layer, PersistentLayer
- remove get_value_reconstruct_data for Delta, Image
- remove unnecessary default trait method
- remove trait PersistentLayer
- remove unused {Delta,Image}Layer::new()
- continued dead code removal
- unify ImageLayer to be like DeltaLayer
- dead code and imports cleanup
- remove PathOrConfig
- correct few doc links re: Layer removal
2023-08-28 12:43:11 +03:00
Joonas Koivunen
106bda1ef9 layer_map: finally remove replacement related tests 2023-08-28 12:43:11 +03:00
Joonas Koivunen
c2de71e1fe test: fix test_timeline_deletion_with_files_stuck_in_upload_queue, global allowed_error 2023-08-28 12:43:11 +03:00
Joonas Koivunen
3eba531c3d test: fixup layere error type introduction, string change 2023-08-28 12:43:11 +03:00
Joonas Koivunen
088bea8680 test: add hint about mismatch cause 2023-08-28 12:43:11 +03:00
Joonas Koivunen
a9b0ac92bc layere: decrement resident size if removed
make the problem of not knowing more explicit.
2023-08-28 12:43:11 +03:00
Joonas Koivunen
3045956ddd refactor(LayerE): use new internal api 2023-08-28 12:43:11 +03:00
Joonas Koivunen
599069b612 eviction: remove comment 2023-08-28 12:43:11 +03:00
Joonas Koivunen
54873844c2 layere: introduce internal error type 2023-08-28 12:43:11 +03:00
Joonas Koivunen
dfdd41a771 layere: move task_name closer 2023-08-28 12:43:11 +03:00
Joonas Koivunen
12763ca312 layere: reset wanted_evicted only if downloading 2023-08-28 12:43:11 +03:00
Joonas Koivunen
4c80c8c1ab test: fix changed string (no more remote layer remote)
this should be the only one:
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-4938/5922954346/index.html#suites/7745dadbd815ab87f5798aa881796f47/96ce406b6d6a7427
2023-08-28 12:43:11 +03:00
Joonas Koivunen
acd2e7f222 timeline: fix test after wait_and_evict 2023-08-28 12:43:11 +03:00
Joonas Koivunen
a6b6dd2f36 timeline: reflect evict_and_wait in tests 2023-08-28 12:43:11 +03:00
Joonas Koivunen
0fd14ad74b timeline: remove GenericRemoteStorage exposure 2023-08-28 12:43:11 +03:00
Joonas Koivunen
52eaa52573 wip: get rid of LayerE::evict 2023-08-28 12:43:11 +03:00
Joonas Koivunen
1e33692c1c comment out the delta dumping test 2023-08-28 12:43:11 +03:00
Joonas Koivunen
f82ba477a4 http: limit genericremotestorage exposure 2023-08-28 12:43:11 +03:00
Joonas Koivunen
761644af25 dube: adjust comments 2023-08-28 12:43:11 +03:00
Joonas Koivunen
cd12d97ba7 dube: limit GenericRemoteStorage need 2023-08-28 12:43:11 +03:00
Joonas Koivunen
1e4ded860c refactor: move LayerE::get 2023-08-28 12:43:11 +03:00
Joonas Koivunen
a0f29853b3 layere: rewrite to heavier_once_cell 2023-08-28 12:43:11 +03:00
Joonas Koivunen
c4cdf747f8 add heavier_once_cell 2023-08-28 12:43:11 +03:00
Joonas Koivunen
e658f16810 test_download_remote_layers_api: fix expected string 2023-08-28 12:43:11 +03:00
Joonas Koivunen
a4b4305422 cleanup while hunting for test_gc_cutoff problem 2023-08-28 12:43:11 +03:00
Joonas Koivunen
d8807eb651 test: fix test_broken_timeline (string matching) 2023-08-28 12:43:11 +03:00
Joonas Koivunen
1500f711f3 test: this test no longer makes sense, we dont replace 2023-08-28 12:43:11 +03:00
Joonas Koivunen
dafa42eb71 test: fix test assuming race between compaction and upload 2023-08-28 12:43:11 +03:00
Joonas Koivunen
b5e5ead2ee delta_layer: allow unused load_keys 2023-08-28 12:43:09 +03:00
Joonas Koivunen
24251b8d17 botched rebase: added block_on code 2023-08-28 12:42:28 +03:00
Joonas Koivunen
e6378197a7 timeline: drop 2 indentation from get_reconstruct_value 2023-08-28 12:42:28 +03:00
Joonas Koivunen
4bb0cc2fe4 gc: reflect LayerE now managing remote client 2023-08-28 12:42:28 +03:00
Joonas Koivunen
9304f42ea5 tenant: remove unused imports 2023-08-28 12:42:28 +03:00
Joonas Koivunen
18f4eb2622 timeline: remove compare_arced_layers 2023-08-28 12:42:28 +03:00
Joonas Koivunen
2caa8bcc23 layer_manager: more clippy 2023-08-28 12:42:28 +03:00
Joonas Koivunen
bb222abde1 layer_manager: remove metrics, dependend by gc/compaction 2023-08-28 12:42:28 +03:00
Joonas Koivunen
dd2b4ad26f layer_manager: cleanup unused 2023-08-28 12:42:28 +03:00
Joonas Koivunen
b65cb8ea05 storage_layer: we no longer clone for residency change 2023-08-28 12:42:28 +03:00
Joonas Koivunen
3a7efc10a0 timeline: cleanup unused imports 2023-08-28 12:42:28 +03:00
Joonas Koivunen
a682de1dba Timeline: get_value_reconstruct_data: avoid warnings 2023-08-28 12:42:28 +03:00
Joonas Koivunen
45a542c335 tenant: pub(crate) 2023-08-28 12:42:28 +03:00
Joonas Koivunen
bdb98b288e ==== reprocessed 2023-08-28 12:42:28 +03:00
Joonas Koivunen
a06c8e9add inmemory_layer: drop impl Layer 2023-08-28 12:42:28 +03:00
Joonas Koivunen
18eefd61eb inmemory_layer: less warnings 2023-08-28 12:42:28 +03:00
Joonas Koivunen
a2de0574b5 inmemorylayer: flush integration, partial? 2023-08-28 12:42:28 +03:00
Joonas Koivunen
aa8e954197 layer_manager, layer_access_stats: remove witness sidequest
it was interfering with me moving responsibilities back and forth
2023-08-28 12:42:28 +03:00
Joonas Koivunen
30847e59b9 remove remote_layer instead of keep fixing 2023-08-28 12:42:28 +03:00
Joonas Koivunen
d930a581f8 layer_manager: compaction changes 2023-08-28 12:42:28 +03:00
Joonas Koivunen
931c22545b compaction: clippies 2023-08-28 12:42:28 +03:00
Joonas Koivunen
366f3c8ff8 compaction: reflect LayerE now managing remote client 2023-08-28 12:42:28 +03:00
Joonas Koivunen
26c39d7b4c timeline: pub(crate) compaction 2023-08-28 12:42:28 +03:00
Joonas Koivunen
6ffa5138ce compaction: upload index on success
otherwise test_gc_of_remote_layers fails
2023-08-28 12:42:28 +03:00
Joonas Koivunen
975e1558cc compaction: integration 2023-08-28 12:42:28 +03:00
Joonas Koivunen
82a955ebfe layer_manager: resident layer flush l0 changes 2023-08-28 12:42:28 +03:00
Joonas Koivunen
82596f8807 delta_layer: reflect non-async LayerE::for_written 2023-08-28 12:42:28 +03:00
Joonas Koivunen
e3e57579a1 integrate: download_all_layers
this time around with graceful cancellation.
2023-08-28 12:42:28 +03:00
Joonas Koivunen
56d551e6a3 ==== reprocessed 2023-08-28 12:42:28 +03:00
Joonas Koivunen
b4a0f8baf7 integrate: Timeline::get_value_reconstruct_data 2023-08-28 12:42:28 +03:00
Joonas Koivunen
2e686ed6ea eviction: integration
- evictiontask: remove unused imports
- eviction_task and dube: cleanup
- timeline: pub(crate) eviction
- timeline: adjust to "layere: adjust eviction"
- test: remove layer_eviction_aba_fails because it can no longer happen
- test: fix up evicts later test with ability to await for eviction
- eviction_task: more unused imports
- eviction: clippy
- eviction_task: more clippy
- fixup eviction: docs
- eviction: hold only Arc<Layer> after checking downloadedness
- refactor earlier eviction: use drop_eviction_guard instead
- dube: evict in spawned tasks
- timeline, eviction: evict in spawned
- eviction: add more errors
- evict_layers: remove witness
- eviction: post-witness forgotten panic
- eviction: remove blog references
2023-08-28 12:42:28 +03:00
Joonas Koivunen
c46f72d411 create_image_layers integration (multifile) 2023-08-28 12:42:28 +03:00
Joonas Koivunen
ed46713e5c remote_timeline_client: pub(crate) on upload 2023-08-28 12:42:28 +03:00
Joonas Koivunen
c228cb7b3f remote_timeline_client: continued integration work post ResidentLayer 2023-08-28 12:42:28 +03:00
Joonas Koivunen
c88e4a0974 layer_manager: integration 2023-08-28 12:42:28 +03:00
Joonas Koivunen
b71a2f4cb2 load_layer_map: integration 2023-08-28 12:42:23 +03:00
Joonas Koivunen
98a7b090de ========= layere 2023-08-28 12:41:31 +03:00
Joonas Koivunen
235a8cbd28 wip: LayerE 2023-08-28 12:41:31 +03:00
Joonas Koivunen
6afeb3c6f7 ========= unrelated 2023-08-28 12:41:31 +03:00
Joonas Koivunen
8c42aeac9f test: move log to assert message 2023-08-28 12:41:31 +03:00
Joonas Koivunen
f36658ac10 layerdesc: add from_filename 2023-08-28 12:41:31 +03:00
Joonas Koivunen
f8227da9da unrelated: layer_manager: avoid arc cloning 2023-08-28 12:41:31 +03:00
29 changed files with 2573 additions and 2333 deletions

View File

@@ -68,6 +68,8 @@ pub mod completion;
/// Reporting utilities
pub mod error;
pub mod sync;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:

1
libs/utils/src/sync.rs Normal file
View File

@@ -0,0 +1 @@
pub mod heavier_once_cell;

View File

@@ -0,0 +1,306 @@
use std::sync::{Arc, Mutex, MutexGuard};
use tokio::sync::Semaphore;
/// Custom design like [`tokio::sync::OnceCell`] but using [`OwnedSemaphorePermit`] instead of
/// `SemaphorePermit`, allowing use of `take` which does not require holding an outer mutex guard
/// for the duration of initialization.
///
/// Has no unsafe, builds upon [`tokio::sync::Semaphore`] and [`std::sync::Mutex`].
///
/// [`OwnedSemaphorePermit`]: tokio::sync::OwnedSemaphorePermit
pub struct OnceCell<T> {
inner: Mutex<Inner<T>>,
}
impl<T> Default for OnceCell<T> {
/// Create new uninitialized [`OnceCell`].
fn default() -> Self {
Self {
inner: Default::default(),
}
}
}
/// Semaphore is the current state:
/// - open semaphore means the value is `None`, not yet initialized
/// - closed semaphore means the value has been initialized
#[derive(Debug)]
struct Inner<T> {
init_semaphore: Arc<Semaphore>,
value: Option<T>,
}
impl<T> Default for Inner<T> {
fn default() -> Self {
Self {
init_semaphore: Arc::new(Semaphore::new(1)),
value: None,
}
}
}
impl<T> OnceCell<T> {
/// Creates an already initialized `OnceCell` with the given value.
pub fn new(value: T) -> Self {
let sem = Semaphore::new(1);
sem.close();
Self {
inner: Mutex::new(Inner {
init_semaphore: Arc::new(sem),
value: Some(value),
}),
}
}
/// Returns a guard to an existing initialized value, or uniquely initializes the value before
/// returning the guard.
///
/// Initializing might wait on any existing [`Guard::take_and_deinit`] deinitialization.
///
/// Initialization is panic-safe and cancellation-safe.
pub async fn get_or_init<F, Fut, E>(&self, factory: F) -> Result<Guard<'_, T>, E>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let sem = {
let guard = self.inner.lock().unwrap();
if guard.value.is_some() {
return Ok(Guard(guard));
}
guard.init_semaphore.clone()
};
let permit = sem.acquire_owned().await;
if permit.is_err() {
let guard = self.inner.lock().unwrap();
assert!(
guard.value.is_some(),
"semaphore got closed, must be initialized"
);
return Ok(Guard(guard));
} else {
// now we try
let value = factory().await?;
let mut guard = self.inner.lock().unwrap();
assert!(
guard.value.is_none(),
"we won permit, must not be initialized"
);
guard.value = Some(value);
guard.init_semaphore.close();
Ok(Guard(guard))
}
}
/// Returns a guard to an existing initialized value, if any.
pub fn get(&self) -> Option<Guard<'_, T>> {
let guard = self.inner.lock().unwrap();
if guard.value.is_some() {
Some(Guard(guard))
} else {
None
}
}
}
/// Uninteresting guard object to allow short-lived access to inspect or clone the held,
/// initialized value.
#[derive(Debug)]
pub struct Guard<'a, T>(MutexGuard<'a, Inner<T>>);
impl<T> std::ops::Deref for Guard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.0
.value
.as_ref()
.expect("guard is not created unless value has been initialized")
}
}
impl<T> std::ops::DerefMut for Guard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0
.value
.as_mut()
.expect("guard is not created unless value has been initialized")
}
}
impl<'a, T> Guard<'a, T> {
/// Take the current value, and a new permit for it's deinitialization.
///
/// The permit will be on a semaphore part of the new internal value, and any following
/// [`OnceCell::get_or_init`] will wait on it to complete.
pub fn take_and_deinit(&mut self) -> (T, tokio::sync::OwnedSemaphorePermit) {
let mut swapped = Inner::default();
let permit = swapped
.init_semaphore
.clone()
.try_acquire_owned()
.expect("we just created this");
std::mem::swap(&mut *self.0, &mut swapped);
swapped
.value
.map(|v| (v, permit))
.expect("guard is not created unless value has been initialized")
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{
convert::Infallible,
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
#[tokio::test]
async fn many_initializers() {
#[derive(Default, Debug)]
struct Counters {
factory_got_to_run: AtomicUsize,
future_polled: AtomicUsize,
winners: AtomicUsize,
}
let initializers = 100;
let cell = Arc::new(OnceCell::default());
let counters = Arc::new(Counters::default());
let barrier = Arc::new(tokio::sync::Barrier::new(initializers + 1));
let mut js = tokio::task::JoinSet::new();
for i in 0..initializers {
js.spawn({
let cell = cell.clone();
let counters = counters.clone();
let barrier = barrier.clone();
async move {
barrier.wait().await;
let won = {
let g = cell
.get_or_init(|| {
counters.factory_got_to_run.fetch_add(1, Ordering::Relaxed);
async {
counters.future_polled.fetch_add(1, Ordering::Relaxed);
Ok::<_, Infallible>(i)
}
})
.await
.unwrap();
*g == i
};
if won {
counters.winners.fetch_add(1, Ordering::Relaxed);
}
}
});
}
barrier.wait().await;
while let Some(next) = js.join_next().await {
next.expect("no panics expected");
}
let mut counters = Arc::try_unwrap(counters).unwrap();
assert_eq!(*counters.factory_got_to_run.get_mut(), 1);
assert_eq!(*counters.future_polled.get_mut(), 1);
assert_eq!(*counters.winners.get_mut(), 1);
}
#[tokio::test(start_paused = true)]
async fn reinit_waits_for_deinit() {
// with he tokio::time paused, we will "sleep" for 1s while holding the reinitialization
let sleep_for = Duration::from_secs(1);
let initial = 42;
let reinit = 1;
let cell = Arc::new(OnceCell::new(initial));
let deinitialization_started = Arc::new(tokio::sync::Barrier::new(2));
let jh = tokio::spawn({
let cell = cell.clone();
let deinitialization_started = deinitialization_started.clone();
async move {
let (answer, _permit) = cell.get().expect("initialized to value").take_and_deinit();
assert_eq!(answer, initial);
deinitialization_started.wait().await;
tokio::time::sleep(sleep_for).await;
}
});
deinitialization_started.wait().await;
let started_at = tokio::time::Instant::now();
cell.get_or_init(|| async { Ok::<_, Infallible>(reinit) })
.await
.unwrap();
let elapsed = started_at.elapsed();
assert!(
elapsed >= sleep_for,
"initialization should had taken at least the time time slept with permit"
);
jh.await.unwrap();
assert_eq!(*cell.get().unwrap(), reinit);
}
#[tokio::test]
async fn initialization_attemptable_until_ok() {
let cell = OnceCell::default();
for _ in 0..10 {
cell.get_or_init(|| async { Err("whatever error") })
.await
.unwrap_err();
}
let g = cell
.get_or_init(|| async { Ok::<_, Infallible>("finally success") })
.await
.unwrap();
assert_eq!(*g, "finally success");
}
#[tokio::test]
async fn initialization_is_cancellation_safe() {
let cell = OnceCell::default();
let barrier = tokio::sync::Barrier::new(2);
let initializer = cell.get_or_init(|| async {
barrier.wait().await;
futures::future::pending::<()>().await;
Ok::<_, Infallible>("never reached")
});
tokio::select! {
_ = initializer => { unreachable!("cannot complete; stuck in pending().await") },
_ = barrier.wait() => {}
};
// now initializer is dropped
assert!(cell.get().is_none());
let g = cell
.get_or_init(|| async { Ok::<_, Infallible>("now initialized") })
.await
.unwrap();
assert_eq!(*g, "now initialized");
}
}

View File

@@ -60,7 +60,11 @@ use utils::serde_percent::Percent;
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{self, storage_layer::PersistentLayer, timeline::EvictionError, Timeline},
tenant::{
self,
storage_layer::{AsLayerDesc, EvictionError, Layer},
Timeline,
},
};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -108,7 +112,7 @@ pub fn launch_disk_usage_global_eviction_task(
_ = background_jobs_barrier.wait() => { }
};
disk_usage_eviction_task(&state, task_config, storage, &conf.tenants_path(), cancel)
disk_usage_eviction_task(&state, task_config, &storage, &conf.tenants_path(), cancel)
.await;
Ok(())
},
@@ -121,7 +125,7 @@ pub fn launch_disk_usage_global_eviction_task(
async fn disk_usage_eviction_task(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
storage: GenericRemoteStorage,
_storage: &GenericRemoteStorage,
tenants_dir: &Path,
cancel: CancellationToken,
) {
@@ -145,14 +149,8 @@ async fn disk_usage_eviction_task(
let start = Instant::now();
async {
let res = disk_usage_eviction_task_iteration(
state,
task_config,
&storage,
tenants_dir,
&cancel,
)
.await;
let res =
disk_usage_eviction_task_iteration(state, task_config, tenants_dir, &cancel).await;
match res {
Ok(()) => {}
@@ -183,13 +181,12 @@ pub trait Usage: Clone + Copy + std::fmt::Debug {
async fn disk_usage_eviction_task_iteration(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
storage: &GenericRemoteStorage,
tenants_dir: &Path,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let usage_pre = filesystem_level_usage::get(tenants_dir, task_config)
.context("get filesystem-level disk usage before evictions")?;
let res = disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, cancel).await;
let res = disk_usage_eviction_task_iteration_impl(state, usage_pre, cancel).await;
match res {
Ok(outcome) => {
debug!(?outcome, "disk_usage_eviction_iteration finished");
@@ -273,7 +270,6 @@ struct LayerCount {
pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
state: &State,
storage: &GenericRemoteStorage,
usage_pre: U,
cancel: &CancellationToken,
) -> anyhow::Result<IterationOutcome<U>> {
@@ -330,9 +326,10 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// If we get far enough in the list that we start to evict layers that are below
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
let mut batched: HashMap<_, Vec<Arc<dyn PersistentLayer>>> = HashMap::new();
let mut batched: HashMap<_, Vec<_>> = HashMap::new();
let mut warned = None;
let mut usage_planned = usage_pre;
let mut max_batch_size = 0;
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
if !usage_planned.has_pressure() {
debug!(
@@ -349,10 +346,15 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size);
batched
.entry(TimelineKey(candidate.timeline))
.or_default()
.push(candidate.layer);
let batch = batched.entry(TimelineKey(candidate.timeline)).or_default();
// semaphore will later be used to limit eviction concurrency, and we can express at
// most u32 number of permits. unlikely we would have u32::MAX layers to be evicted,
// but fail gracefully by not making batches larger.
if batch.len() < u32::MAX as usize {
batch.push(candidate.layer);
max_batch_size = max_batch_size.max(batch.len());
}
}
let usage_planned = match warned {
@@ -369,64 +371,101 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// phase2: evict victims batched by timeline
// After the loop, `usage_assumed` is the post-eviction usage,
// according to internal accounting.
let mut usage_assumed = usage_pre;
let mut evictions_failed = LayerCount::default();
let mut js = tokio::task::JoinSet::new();
// ratelimit to 1k files or any higher max batch size
let limit = Arc::new(tokio::sync::Semaphore::new(1000.max(max_batch_size)));
for (timeline, batch) in batched {
let tenant_id = timeline.tenant_id;
let timeline_id = timeline.timeline_id;
let batch_size = batch.len();
let batch_size =
u32::try_from(batch.len()).expect("batch size limited to u32::MAX during partitioning");
// I dislike naming of `available_permits` but it means current total amount of permits
// because permits can be added
assert!(batch_size as usize <= limit.available_permits());
debug!(%timeline_id, "evicting batch for timeline");
async {
let results = timeline.evict_layers(storage, &batch, cancel.clone()).await;
let evict = {
let limit = limit.clone();
let cancel = cancel.clone();
async move {
let mut evicted_bytes = 0;
let mut evictions_failed = LayerCount::default();
match results {
Err(e) => {
warn!("failed to evict batch: {:#}", e);
}
Ok(results) => {
assert_eq!(results.len(), batch.len());
for (result, layer) in results.into_iter().zip(batch.iter()) {
let file_size = layer.layer_desc().file_size;
match result {
Some(Ok(())) => {
usage_assumed.add_available_bytes(file_size);
}
Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
unreachable!("get_local_layers_for_disk_usage_eviction finds only local layers")
}
Some(Err(EvictionError::FileNotFound)) => {
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
Some(Err(
e @ EvictionError::LayerNotFound(_)
| e @ EvictionError::StatFailed(_),
)) => {
let e = utils::error::report_compact_sources(&e);
warn!(%layer, "failed to evict layer: {e}");
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
None => {
assert!(cancel.is_cancelled());
return;
let Ok(_permit) = limit.acquire_many_owned(batch_size).await else {
// semaphore closing means cancelled
return (evicted_bytes, evictions_failed);
};
let results = timeline.evict_layers(&batch, &cancel).await;
match results {
Ok(results) => {
assert_eq!(results.len(), batch.len());
for (result, layer) in results.into_iter().zip(batch.iter()) {
let file_size = layer.layer_desc().file_size;
match result {
Some(Ok(())) => {
evicted_bytes += file_size;
}
Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
None => {
assert!(cancel.is_cancelled());
}
}
}
}
Err(e) => {
warn!("failed to evict batch: {:#}", e);
}
}
(evicted_bytes, evictions_failed)
}
}
.instrument(tracing::info_span!("evict_batch", %tenant_id, %timeline_id, batch_size))
.await;
.instrument(tracing::info_span!("evict_batch", %tenant_id, %timeline_id, batch_size));
if cancel.is_cancelled() {
js.spawn(evict);
// spwaning multiple thousands of these is essentially blocking, so give already spawned a
// chance of making progress
tokio::task::yield_now().await;
}
let join_all = async move {
// After the evictions, `usage_assumed` is the post-eviction usage,
// according to internal accounting.
let mut usage_assumed = usage_pre;
let mut evictions_failed = LayerCount::default();
while let Some(res) = js.join_next().await {
match res {
Ok((evicted_bytes, failed)) => {
usage_assumed.add_available_bytes(evicted_bytes);
evictions_failed.file_sizes += failed.file_sizes;
evictions_failed.count += failed.count;
}
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => { /* already logged */ }
Err(je) => tracing::error!("unknown JoinError: {je:?}"),
}
}
(usage_assumed, evictions_failed)
};
let (usage_assumed, evictions_failed) = tokio::select! {
tuple = join_all => { tuple },
_ = cancel.cancelled() => {
// close the semaphore to stop any pending acquires
limit.close();
return Ok(IterationOutcome::Cancelled);
}
}
};
Ok(IterationOutcome::Finished(IterationOutcomeFinished {
before: usage_pre,
@@ -441,7 +480,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
#[derive(Clone)]
struct EvictionCandidate {
timeline: Arc<Timeline>,
layer: Arc<dyn PersistentLayer>,
layer: Layer,
last_activity_ts: SystemTime,
}

View File

@@ -1028,7 +1028,7 @@ async fn timeline_compact_handler(
timeline
.compact(&cancel, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
.map_err(|e| ApiError::InternalServerError(e.into()))?;
json_response(StatusCode::OK, ())
}
.instrument(info_span!("manual_compaction", %tenant_id, %timeline_id))
@@ -1053,7 +1053,7 @@ async fn timeline_checkpoint_handler(
timeline
.compact(&cancel, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
.map_err(|e| ApiError::InternalServerError(e.into()))?;
json_response(StatusCode::OK, ())
}
@@ -1160,11 +1160,11 @@ async fn disk_usage_eviction_run(
let state = get_state(&r);
let Some(storage) = state.remote_storage.clone() else {
if state.remote_storage.as_ref().is_none() {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"remote storage not configured, cannot run eviction iteration"
)));
};
}
let state = state.disk_usage_eviction_state.clone();
@@ -1182,7 +1182,6 @@ async fn disk_usage_eviction_run(
async move {
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
&state,
&storage,
usage,
&child_cancel,
)

View File

@@ -133,9 +133,7 @@ pub(crate) mod timeline;
pub mod size;
pub(crate) use timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
pub use timeline::{
LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
};
pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
// re-export for use in remote_timeline_client.rs
pub use crate::tenant::metadata::save_metadata;
@@ -4041,6 +4039,7 @@ mod tests {
#[tokio::test]
async fn delta_layer_dumping() -> anyhow::Result<()> {
use storage_layer::AsLayerDesc;
let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
@@ -4048,16 +4047,18 @@ mod tests {
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
let layer_map = tline.layers.read().await;
let level0_deltas = layer_map.layer_map().get_level0_deltas()?;
let level0_deltas = layer_map
.layer_map()
.get_level0_deltas()?
.into_iter()
.map(|desc| layer_map.get_from_desc(&desc))
.collect::<Vec<_>>();
assert!(!level0_deltas.is_empty());
for delta in level0_deltas {
let delta = layer_map.get_from_desc(&delta);
// Ensure we are dumping a delta layer here
let delta = delta.downcast_delta_layer().unwrap();
delta.dump(false, &ctx).await.unwrap();
assert!(delta.layer_desc().is_delta);
delta.dump(true, &ctx).await.unwrap();
}

View File

@@ -639,147 +639,10 @@ impl LayerMap {
}
println!("historic_layers:");
for layer in self.iter_historic_layers() {
layer.dump(verbose, ctx)?;
for desc in self.iter_historic_layers() {
desc.dump();
}
println!("End dump LayerMap");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::LayerMap;
use crate::tenant::storage_layer::LayerFileName;
use std::str::FromStr;
use std::sync::Arc;
mod l0_delta_layers_updated {
use crate::tenant::{
storage_layer::{AsLayerDesc, PersistentLayerDesc},
timeline::layer_manager::LayerFileManager,
};
use super::*;
struct LayerObject(PersistentLayerDesc);
impl AsLayerDesc for LayerObject {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.0
}
}
impl LayerObject {
fn new(desc: PersistentLayerDesc) -> Self {
LayerObject(desc)
}
}
type TestLayerFileManager = LayerFileManager<LayerObject>;
#[test]
fn for_full_range_delta() {
// l0_delta_layers are used by compaction, and should observe all buffered updates
l0_delta_layers_updated_scenario(
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
true
)
}
#[test]
fn for_non_full_range_delta() {
// has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta
l0_delta_layers_updated_scenario(
"000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
// because not full range
false
)
}
#[test]
fn for_image() {
l0_delta_layers_updated_scenario(
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
// code only checks if it is a full range layer, doesn't care about images, which must
// mean we should in practice never have full range images
false
)
}
#[test]
fn replacing_missing_l0_is_notfound() {
// original impl had an oversight, and L0 was an anyhow::Error. anyhow::Error should
// however only happen for precondition failures.
let layer = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69";
let layer = LayerFileName::from_str(layer).unwrap();
let layer = PersistentLayerDesc::from(layer);
// same skeletan construction; see scenario below
let not_found = Arc::new(LayerObject::new(layer.clone()));
let new_version = Arc::new(LayerObject::new(layer));
// after the immutable storage state refactor, the replace operation
// will not use layer map any more. We keep it here for consistency in test cases
// and can remove it in the future.
let _map = LayerMap::default();
let mut mapping = TestLayerFileManager::new();
mapping
.replace_and_verify(not_found, new_version)
.unwrap_err();
}
fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
let name = LayerFileName::from_str(layer_name).unwrap();
let skeleton = PersistentLayerDesc::from(name);
let remote = Arc::new(LayerObject::new(skeleton.clone()));
let downloaded = Arc::new(LayerObject::new(skeleton));
let mut map = LayerMap::default();
let mut mapping = LayerFileManager::new();
// two disjoint Arcs in different lifecycle phases. even if it seems they must be the
// same layer, we use LayerMap::compare_arced_layers as the identity of layers.
assert_eq!(remote.layer_desc(), downloaded.layer_desc());
let expected_in_counts = (1, usize::from(expected_l0));
map.batch_update()
.insert_historic(remote.layer_desc().clone());
mapping.insert(remote.clone());
assert_eq!(
count_layer_in(&map, remote.layer_desc()),
expected_in_counts
);
mapping
.replace_and_verify(remote, downloaded.clone())
.expect("name derived attributes are the same");
assert_eq!(
count_layer_in(&map, downloaded.layer_desc()),
expected_in_counts
);
map.batch_update().remove_historic(downloaded.layer_desc());
assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));
}
fn count_layer_in(map: &LayerMap, layer: &PersistentLayerDesc) -> (usize, usize) {
let historic = map
.iter_historic_layers()
.filter(|x| x.key() == layer.key())
.count();
let l0s = map
.get_level0_deltas()
.expect("why does this return a result");
let l0 = l0s.iter().filter(|x| x.key() == layer.key()).count();
(historic, l0)
}
}
}

View File

@@ -163,8 +163,6 @@
//! - download their remote [`IndexPart`]s
//! - create `Timeline` struct and a `RemoteTimelineClient`
//! - initialize the client's upload queue with its `IndexPart`
//! - create [`RemoteLayer`](super::storage_layer::RemoteLayer) instances
//! for layers that are referenced by `IndexPart` but not present locally
//! - schedule uploads for layers that are only present locally.
//! - if the remote `IndexPart`'s metadata was newer than the metadata in
//! the local filesystem, write the remote metadata to the local filesystem
@@ -233,7 +231,8 @@ use crate::metrics::{
};
use crate::task_mgr::shutdown_token;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
pub(crate) use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::AsLayerDesc;
use crate::tenant::upload_queue::Delete;
use crate::{
config::PageServerConf,
@@ -250,7 +249,7 @@ use utils::id::{TenantId, TimelineId};
use self::index::IndexPart;
use super::storage_layer::LayerFileName;
use super::storage_layer::{LayerFileName, ResidentLayer};
use super::upload_queue::SetDeletedFlagProgress;
// Occasional network issues and such can cause remote operations to fail, and
@@ -598,25 +597,25 @@ impl RemoteTimelineClient {
///
/// Launch an upload operation in the background.
///
pub fn schedule_layer_file_upload(
pub(crate) fn schedule_layer_file_upload(
self: &Arc<Self>,
layer_file_name: &LayerFileName,
layer_metadata: &LayerFileMetadata,
layer: ResidentLayer,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
let metadata = LayerFileMetadata::new(layer.layer_desc().file_size);
upload_queue
.latest_files
.insert(layer_file_name.clone(), layer_metadata.clone());
.insert(layer.layer_desc().filename(), metadata.clone());
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
let op = UploadOp::UploadLayer(layer_file_name.clone(), layer_metadata.clone());
info!("scheduled layer file upload {layer}");
let op = UploadOp::UploadLayer(layer, metadata);
self.calls_unfinished_metric_begin(&op);
upload_queue.queued_operations.push_back(op);
info!("scheduled layer file upload {layer_file_name}");
// Launch the task immediately, if possible
self.launch_queued_tasks(upload_queue);
Ok(())
@@ -1054,11 +1053,8 @@ impl RemoteTimelineClient {
}
let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref layer_file_name, ref layer_metadata) => {
let path = &self
.conf
.timeline_path(&self.tenant_id, &self.timeline_id)
.join(layer_file_name.file_name());
UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
let path = layer.local_path();
upload::upload_timeline_layer(
self.conf,
&self.storage_impl,
@@ -1367,6 +1363,7 @@ mod tests {
context::RequestContext,
tenant::{
harness::{TenantHarness, TIMELINE_ID},
storage_layer::Layer,
Tenant, Timeline,
},
DEFAULT_PG_VERSION,
@@ -1507,7 +1504,7 @@ mod tests {
let TestSetup {
harness,
tenant: _tenant,
timeline: _timeline,
timeline,
tenant_ctx: _tenant_ctx,
remote_fs_dir,
client,
@@ -1527,32 +1524,29 @@ mod tests {
.unwrap();
// Create a couple of dummy files, schedule upload for them
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let layer_file_name_2: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap();
let layer_file_name_3: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap();
let content_1 = dummy_contents("foo");
let content_2 = dummy_contents("bar");
let content_3 = dummy_contents("baz");
for (filename, content) in [
(&layer_file_name_1, &content_1),
(&layer_file_name_2, &content_2),
(&layer_file_name_3, &content_3),
] {
std::fs::write(timeline_path.join(filename.file_name()), content).unwrap();
}
let layers = [
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), dummy_contents("foo")),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(), dummy_contents("bar")),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(), dummy_contents("baz"))
]
.into_iter()
.map(|(name, contents): (LayerFileName, Vec<u8>)| {
std::fs::write(timeline_path.join(name.file_name()), &contents).unwrap();
Layer::for_resident(
harness.conf,
&timeline,
name,
LayerFileMetadata::new(contents.len() as u64),
)
}).collect::<Vec<_>>();
client
.schedule_layer_file_upload(
&layer_file_name_1,
&LayerFileMetadata::new(content_1.len() as u64),
)
.schedule_layer_file_upload(layers[0].clone())
.unwrap();
client
.schedule_layer_file_upload(
&layer_file_name_2,
&LayerFileMetadata::new(content_2.len() as u64),
)
.schedule_layer_file_upload(layers[1].clone())
.unwrap();
// Check that they are started immediately, not queued
@@ -1605,21 +1599,18 @@ mod tests {
.map(|f| f.to_owned())
.collect(),
&[
&layer_file_name_1.file_name(),
&layer_file_name_2.file_name(),
&layers[0].layer_desc().filename().file_name(),
&layers[1].layer_desc().filename().file_name(),
],
);
assert_eq!(index_part.metadata, metadata);
// Schedule upload and then a deletion. Check that the deletion is queued
client
.schedule_layer_file_upload(
&layer_file_name_3,
&LayerFileMetadata::new(content_3.len() as u64),
)
.schedule_layer_file_upload(layers[2].clone())
.unwrap();
client
.schedule_layer_file_deletion(&[layer_file_name_1.clone()])
.schedule_layer_file_deletion(&[layers[0].layer_desc().filename()])
.unwrap();
{
let mut guard = client.upload_queue.lock().unwrap();
@@ -1634,8 +1625,8 @@ mod tests {
}
assert_remote_files(
&[
&layer_file_name_1.file_name(),
&layer_file_name_2.file_name(),
&layers[0].layer_desc().filename().file_name(),
&layers[1].layer_desc().filename().file_name(),
"index_part.json",
],
&remote_timeline_dir,
@@ -1646,8 +1637,8 @@ mod tests {
assert_remote_files(
&[
&layer_file_name_2.file_name(),
&layer_file_name_3.file_name(),
&layers[1].layer_desc().filename().file_name(),
&layers[2].layer_desc().filename().file_name(),
"index_part.json",
],
&remote_timeline_dir,
@@ -1661,7 +1652,7 @@ mod tests {
let TestSetup {
harness,
tenant: _tenant,
timeline: _timeline,
timeline,
client,
..
} = TestSetup::new("metrics").await.unwrap();
@@ -1681,6 +1672,13 @@ mod tests {
)
.unwrap();
let layer_file_1 = Layer::for_resident(
harness.conf,
&timeline,
layer_file_name_1.clone(),
LayerFileMetadata::new(content_1.len() as u64),
);
#[derive(Debug, PartialEq)]
struct BytesStartedFinished {
started: Option<usize>,
@@ -1706,10 +1704,7 @@ mod tests {
let init = get_bytes_started_stopped();
client
.schedule_layer_file_upload(
&layer_file_name_1,
&LayerFileMetadata::new(content_1.len() as u64),
)
.schedule_layer_file_upload(layer_file_1.clone())
.unwrap();
let pre = get_bytes_started_stopped();

View File

@@ -67,6 +67,8 @@ pub(super) async fn upload_timeline_layer<'a>(
// upload. However, a nonexistent file can also be indicative of
// something worse, like when a file is scheduled for upload before
// it has been written to disk yet.
//
// This is tested against `test_compaction_delete_before_upload`
info!(path = %source_path.display(), "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
return Ok(());
}

View File

@@ -4,26 +4,21 @@ pub mod delta_layer;
mod filename;
mod image_layer;
mod inmemory_layer;
mod layer;
mod layer_desc;
mod remote_layer;
use crate::config::PageServerConf;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Key;
use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use anyhow::Result;
use bytes::Bytes;
use enum_map::EnumMap;
use enumset::EnumSet;
use once_cell::sync::Lazy;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::models::{
HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
};
use std::ops::Range;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::warn;
use utils::history_buffer::HistoryBufferWithDropCounter;
@@ -39,7 +34,8 @@ pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
pub use image_layer::{ImageLayer, ImageLayerWriter};
pub use inmemory_layer::InMemoryLayer;
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
pub use remote_layer::RemoteLayer;
pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
where
@@ -74,7 +70,7 @@ pub struct ValueReconstructState {
pub img: Option<(Lsn, Bytes)>,
}
/// Return value from Layer::get_page_reconstruct_data
/// Return value from [`Layer::get_value_reconstruct_data`]
#[derive(Clone, Copy, Debug)]
pub enum ValueReconstructResult {
/// Got all the data needed to reconstruct the requested page
@@ -179,26 +175,6 @@ impl LayerAccessStats {
new
}
/// Creates a clone of `self` and records `new_status` in the clone.
///
/// The `new_status` is not recorded in `self`.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
///
/// [`record_residence_event`]: Self::record_residence_event
pub(crate) fn clone_for_residence_change(
&self,
new_status: LayerResidenceStatus,
) -> LayerAccessStats {
let clone = {
let inner = self.0.lock().unwrap();
inner.clone()
};
let new = LayerAccessStats(Mutex::new(clone));
new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange);
new
}
/// Record a change in layer residency.
///
/// Recording the event must happen while holding the layer map lock to
@@ -321,95 +297,12 @@ impl LayerAccessStats {
}
}
/// Supertrait of the [`Layer`] trait that captures the bare minimum interface
/// required by [`LayerMap`](super::layer_map::LayerMap).
///
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
/// timeline names, because those are known in the context of which the layers
/// are used in (timeline).
#[async_trait::async_trait]
pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static {
///
/// Return data needed to reconstruct given page at LSN.
///
/// It is up to the caller to collect more data from previous layer and
/// perform WAL redo, if necessary.
///
/// See PageReconstructResult for possible return values. The collected data
/// is appended to reconstruct_data; the caller should pass an empty struct
/// on first call, or a struct with a cached older image of the page if one
/// is available. If this returns ValueReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data.
async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructResult>;
}
/// Get a layer descriptor from a layer.
pub trait AsLayerDesc {
/// Get the layer descriptor.
fn layer_desc(&self) -> &PersistentLayerDesc;
}
/// A Layer contains all data in a "rectangle" consisting of a range of keys and
/// range of LSNs.
///
/// There are two kinds of layers, in-memory and on-disk layers. In-memory
/// layers are used to ingest incoming WAL, and provide fast access to the
/// recent page versions. On-disk layers are stored as files on disk, and are
/// immutable. This trait presents the common functionality of in-memory and
/// on-disk layers.
///
/// Furthermore, there are two kinds of on-disk layers: delta and image layers.
/// A delta layer contains all modifications within a range of LSNs and keys.
/// An image layer is a snapshot of all the data in a key-range, at a single
/// LSN.
pub trait PersistentLayer: Layer + AsLayerDesc {
/// File name used for this layer, both in the pageserver's local filesystem
/// state as well as in the remote storage.
fn filename(&self) -> LayerFileName {
self.layer_desc().filename()
}
// Path to the layer file in the local filesystem.
// `None` for `RemoteLayer`.
fn local_path(&self) -> Option<PathBuf>;
/// Permanently remove this layer from disk.
fn delete_resident_layer_file(&self) -> Result<()>;
fn downcast_remote_layer(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
None
}
fn downcast_delta_layer(self: Arc<Self>) -> Option<std::sync::Arc<DeltaLayer>> {
None
}
fn is_remote_layer(&self) -> bool {
false
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo;
fn access_stats(&self) -> &LayerAccessStats;
}
pub fn downcast_remote_layer(
layer: &Arc<dyn PersistentLayer>,
) -> Option<std::sync::Arc<RemoteLayer>> {
if layer.is_remote_layer() {
Arc::clone(layer).downcast_remote_layer()
} else {
None
}
}
pub mod tests {
use super::*;
@@ -447,19 +340,6 @@ pub mod tests {
}
}
/// Helper enum to hold a PageServerConf, or a path
///
/// This is used by DeltaLayer and ImageLayer. Normally, this holds a reference to the
/// global config, and paths to layer files are constructed using the tenant/timeline
/// path from the config. But in the 'pagectl' binary, we need to construct a Layer
/// struct for a file on disk, without having a page server running, so that we have no
/// config. In that case, we use the Path variant to hold the full path to the file on
/// disk.
enum PathOrConf {
Path(PathBuf),
Conf(&'static PageServerConf),
}
/// Range wrapping newtype, which uses display to render Debug.
///
/// Useful with `Key`, which has too verbose `{:?}` for printing multiple layers.

View File

@@ -34,17 +34,16 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{
PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::Timeline;
use crate::virtual_file::VirtualFile;
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use pageserver_api::models::LayerAccessKind;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
use std::fs::File;
use std::io::{BufWriter, Write};
use std::io::{Seek, SeekFrom};
use std::ops::Range;
@@ -60,10 +59,7 @@ use utils::{
lsn::Lsn,
};
use super::{
AsLayerDesc, DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, PathOrConf,
PersistentLayerDesc,
};
use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
///
/// Header stored in the beginning of the file
@@ -183,20 +179,12 @@ impl DeltaKey {
}
}
/// DeltaLayer is the in-memory data structure associated with an on-disk delta
/// file.
///
/// We keep a DeltaLayer in memory for each file, in the LayerMap. If a layer
/// is in "loaded" state, we have a copy of the index in memory, in 'inner'.
/// Otherwise the struct is just a placeholder for a file that exists on disk,
/// and it needs to be loaded before using it in queries.
/// This is used only from `pagectl`. Within pageserver, all layers are
/// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
pub struct DeltaLayer {
path_or_conf: PathOrConf,
path: PathBuf,
pub desc: PersistentLayerDesc,
access_stats: LayerAccessStats,
inner: OnceCell<Arc<DeltaLayerInner>>,
}
@@ -213,6 +201,8 @@ impl std::fmt::Debug for DeltaLayer {
}
}
/// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
/// file.
pub struct DeltaLayerInner {
// values copied from summary
index_start_blk: u32,
@@ -222,12 +212,6 @@ pub struct DeltaLayerInner {
file: FileBlockReader<VirtualFile>,
}
impl AsRef<DeltaLayerInner> for DeltaLayerInner {
fn as_ref(&self) -> &DeltaLayerInner {
self
}
}
impl std::fmt::Debug for DeltaLayerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DeltaLayerInner")
@@ -237,19 +221,6 @@ impl std::fmt::Debug for DeltaLayerInner {
}
}
#[async_trait::async_trait]
impl Layer for DeltaLayer {
async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
self.get_value_reconstruct_data(key, lsn_range, reconstruct_state, ctx)
.await
}
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
impl std::fmt::Display for DeltaLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -263,40 +234,9 @@ impl AsLayerDesc for DeltaLayer {
}
}
impl PersistentLayer for DeltaLayer {
fn downcast_delta_layer(self: Arc<Self>) -> Option<std::sync::Arc<DeltaLayer>> {
Some(self)
}
fn local_path(&self) -> Option<PathBuf> {
self.local_path()
}
fn delete_resident_layer_file(&self) -> Result<()> {
self.delete_resident_layer_file()
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
self.info(reset)
}
fn access_stats(&self) -> &LayerAccessStats {
self.access_stats()
}
}
impl DeltaLayer {
pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
println!(
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----",
self.desc.tenant_id,
self.desc.timeline_id,
self.desc.key_range.start,
self.desc.key_range.end,
self.desc.lsn_range.start,
self.desc.lsn_range.end,
self.desc.file_size,
);
self.desc.dump();
if !verbose {
return Ok(());
@@ -304,119 +244,7 @@ impl DeltaLayer {
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
println!(
"index_start_blk: {}, root {}",
inner.index_start_blk, inner.index_root_blk
);
let file = &inner.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
file,
);
tree_reader.dump().await?;
let keys = DeltaLayerInner::load_keys(&inner).await?;
// A subroutine to dump a single blob
async fn dump_blob(val: ValueRef<'_>) -> Result<String> {
let buf = val.reader.read_blob(val.blob_ref.pos()).await?;
let val = Value::des(&buf)?;
let desc = match val {
Value::Image(img) => {
format!(" img {} bytes", img.len())
}
Value::WalRecord(rec) => {
let wal_desc = walrecord::describe_wal_record(&rec)?;
format!(
" rec {} bytes will_init: {} {}",
buf.len(),
rec.will_init(),
wal_desc
)
}
};
Ok(desc)
}
for entry in keys {
let DeltaEntry { key, lsn, val, .. } = entry;
let desc = match dump_blob(val).await {
Ok(desc) => desc,
Err(err) => {
let err: anyhow::Error = err;
format!("ERROR: {err}")
}
};
println!(" key {key} at {lsn}: {desc}");
}
Ok(())
}
pub(crate) async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.desc.lsn_range.start);
ensure!(self.desc.key_range.contains(&key));
let inner = self
.load(LayerAccessKind::GetValueReconstructData, ctx)
.await?;
inner
.get_value_reconstruct_data(key, lsn_range, reconstruct_state)
.await
}
pub(crate) fn local_path(&self) -> Option<PathBuf> {
Some(self.path())
}
pub(crate) fn delete_resident_layer_file(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
Ok(())
}
pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.layer_desc().filename().file_name();
let lsn_range = self.layer_desc().lsn_range.clone();
let access_stats = self.access_stats.as_api_model(reset);
HistoricLayerInfo::Delta {
layer_file_name,
layer_file_size: self.desc.file_size,
lsn_start: lsn_range.start,
lsn_end: lsn_range.end,
remote: false,
access_stats,
}
}
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
&self.access_stats
}
fn path_for(
path_or_conf: &PathOrConf,
tenant_id: &TenantId,
timeline_id: &TimelineId,
fname: &DeltaFileName,
) -> PathBuf {
match path_or_conf {
PathOrConf::Path(path) => path.clone(),
PathOrConf::Conf(conf) => conf
.timeline_path(tenant_id, timeline_id)
.join(fname.to_string()),
}
inner.dump().await
}
fn temp_path_for(
@@ -462,52 +290,22 @@ impl DeltaLayer {
async fn load_inner(&self) -> Result<Arc<DeltaLayerInner>> {
let path = self.path();
let summary = match &self.path_or_conf {
PathOrConf::Conf(_) => Some(Summary::from(self)),
PathOrConf::Path(_) => None,
};
let loaded = DeltaLayerInner::load(&path, None)?;
let loaded = DeltaLayerInner::load(&path, summary)?;
// not production code
if let PathOrConf::Path(ref path) = self.path_or_conf {
// not production code
let actual_filename = self.path.file_name().unwrap().to_str().unwrap().to_owned();
let expected_filename = self.layer_desc().filename().file_name();
let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
let expected_filename = self.filename().file_name();
if actual_filename != expected_filename {
println!("warning: filename does not match what is expected from in-file summary");
println!("actual: {:?}", actual_filename);
println!("expected: {:?}", expected_filename);
}
if actual_filename != expected_filename {
println!("warning: filename does not match what is expected from in-file summary");
println!("actual: {:?}", actual_filename);
println!("expected: {:?}", expected_filename);
}
Ok(Arc::new(loaded))
}
/// Create a DeltaLayer struct representing an existing file on disk.
pub fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
filename: &DeltaFileName,
file_size: u64,
access_stats: LayerAccessStats,
) -> DeltaLayer {
DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
desc: PersistentLayerDesc::new_delta(
tenant_id,
timeline_id,
filename.key_range.clone(),
filename.lsn_range.clone(),
file_size,
),
access_stats,
inner: OnceCell::new(),
}
}
/// Create a DeltaLayer struct representing an existing file on disk.
///
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
@@ -522,7 +320,7 @@ impl DeltaLayer {
.context("get file metadata to determine size")?;
Ok(DeltaLayer {
path_or_conf: PathOrConf::Path(path.to_path_buf()),
path: path.to_path_buf(),
desc: PersistentLayerDesc::new_delta(
summary.tenant_id,
summary.timeline_id,
@@ -535,29 +333,9 @@ impl DeltaLayer {
})
}
fn layer_name(&self) -> DeltaFileName {
self.desc.delta_file_name()
}
/// Path to the layer file in pageserver workdir.
pub fn path(&self) -> PathBuf {
Self::path_for(
&self.path_or_conf,
&self.desc.tenant_id,
&self.desc.timeline_id,
&self.layer_name(),
)
}
/// Loads all keys stored in the layer. Returns key, lsn, value size and value reference.
///
/// The value can be obtained via the [`ValueRef::load`] function.
pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
let inner = self
.load(LayerAccessKind::KeyIter, ctx)
.await
.context("load delta layer keys")?;
DeltaLayerInner::load_keys(inner)
.await
.context("Layer index is corrupted")
/// Path to the layer file
fn path(&self) -> PathBuf {
self.path.clone()
}
}
@@ -662,7 +440,7 @@ impl DeltaLayerWriterInner {
///
/// Finish writing the delta layer.
///
fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
fn finish(self, key_end: Key, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -708,37 +486,21 @@ impl DeltaLayerWriterInner {
// Note: Because we opened the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
// set inner.file here. The first read will have to re-open it.
let layer = DeltaLayer {
path_or_conf: PathOrConf::Conf(self.conf),
desc: PersistentLayerDesc::new_delta(
self.tenant_id,
self.timeline_id,
self.key_start..key_end,
self.lsn_range.clone(),
metadata.len(),
),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: OnceCell::new(),
};
let desc = PersistentLayerDesc::new_delta(
self.tenant_id,
self.timeline_id,
self.key_start..key_end,
self.lsn_range.clone(),
metadata.len(),
);
// fsync the file
file.sync_all()?;
// Rename the file to its final name
//
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
let final_path = DeltaLayer::path_for(
&PathOrConf::Conf(self.conf),
&self.tenant_id,
&self.timeline_id,
&DeltaFileName {
key_range: self.key_start..key_end,
lsn_range: self.lsn_range,
},
);
std::fs::rename(self.path, &final_path)?;
trace!("created delta layer {}", final_path.display());
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
trace!("created delta layer {}", layer.local_path().display());
Ok(layer)
}
@@ -821,8 +583,12 @@ impl DeltaLayerWriter {
///
/// Finish writing the delta layer.
///
pub fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
self.inner.take().unwrap().finish(key_end)
pub(crate) fn finish(
mut self,
key_end: Key,
timeline: &Arc<Timeline>,
) -> anyhow::Result<ResidentLayer> {
self.inner.take().unwrap().finish(key_end, timeline)
}
}
@@ -949,14 +715,14 @@ impl DeltaLayerInner {
}
}
pub(super) async fn load_keys<T: AsRef<DeltaLayerInner> + Clone>(
this: &T,
) -> Result<Vec<DeltaEntry<'_>>> {
let dl = this.as_ref();
let file = &dl.file;
pub(super) async fn load_keys(&self) -> Result<Vec<DeltaEntry<'_>>> {
let file = &self.file;
let tree_reader =
DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(dl.index_start_blk, dl.index_root_blk, file);
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
file,
);
let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
@@ -969,7 +735,7 @@ impl DeltaLayerInner {
let val_ref = ValueRef {
blob_ref: BlobRef(value),
reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
Adapter(dl),
Adapter(self),
)),
};
let pos = BlobRef(value).pos();
@@ -993,10 +759,61 @@ impl DeltaLayerInner {
if let Some(last) = all_keys.last_mut() {
// Last key occupies all space till end of value storage,
// which corresponds to beginning of the index
last.size = dl.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
}
Ok(all_keys)
}
pub(super) async fn dump(&self) -> anyhow::Result<()> {
println!(
"index_start_blk: {}, root {}",
self.index_start_blk, self.index_root_blk
);
let file = &self.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
file,
);
tree_reader.dump().await?;
let keys = self.load_keys().await?;
async fn dump_blob(val: ValueRef<'_>) -> anyhow::Result<String> {
let buf = val.reader.read_blob(val.blob_ref.pos()).await?;
let val = Value::des(&buf)?;
let desc = match val {
Value::Image(img) => {
format!(" img {} bytes", img.len())
}
Value::WalRecord(rec) => {
let wal_desc = walrecord::describe_wal_record(&rec)?;
format!(
" rec {} bytes will_init: {} {}",
buf.len(),
rec.will_init(),
wal_desc
)
}
};
Ok(desc)
}
for entry in keys {
let DeltaEntry { key, lsn, val, .. } = entry;
let desc = match dump_blob(val).await {
Ok(desc) => desc,
Err(err) => {
format!("ERROR: {err}")
}
};
println!(" key {key} at {lsn}: {desc}");
}
Ok(())
}
}
/// A set of data associated with a delta layer key and its value
@@ -1032,3 +849,9 @@ impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
self.0.as_ref().file.read_blk(blknum)
}
}
impl AsRef<DeltaLayerInner> for DeltaLayerInner {
fn as_ref(&self) -> &DeltaLayerInner {
self
}
}

View File

@@ -31,22 +31,24 @@ use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::Timeline;
use crate::virtual_file::VirtualFile;
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use hex;
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use pageserver_api::models::LayerAccessKind;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
use std::fs::File;
use std::io::Write;
use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::OnceCell;
use tracing::*;
@@ -57,7 +59,7 @@ use utils::{
};
use super::filename::ImageFileName;
use super::{AsLayerDesc, Layer, LayerAccessStatsReset, PathOrConf, PersistentLayerDesc};
use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
///
/// Header stored in the beginning of the file
@@ -115,22 +117,14 @@ impl Summary {
}
}
/// ImageLayer is the in-memory data structure associated with an on-disk image
/// file.
///
/// We keep an ImageLayer in memory for each file, in the LayerMap. If a layer
/// is in "loaded" state, we have a copy of the index in memory, in 'inner'.
/// Otherwise the struct is just a placeholder for a file that exists on disk,
/// and it needs to be loaded before using it in queries.
/// This is used only from `pagectl`. Within pageserver, all layers are
/// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
pub struct ImageLayer {
path_or_conf: PathOrConf,
path: PathBuf,
pub desc: PersistentLayerDesc,
// This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
pub lsn: Lsn,
access_stats: LayerAccessStats,
inner: OnceCell<ImageLayerInner>,
}
@@ -147,6 +141,8 @@ impl std::fmt::Debug for ImageLayer {
}
}
/// ImageLayer is the in-memory data structure associated with an on-disk image
/// file.
pub struct ImageLayerInner {
// values copied from summary
index_start_blk: u32,
@@ -167,18 +163,22 @@ impl std::fmt::Debug for ImageLayerInner {
}
}
#[async_trait::async_trait]
impl Layer for ImageLayer {
/// Look up given page in the file
async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
self.get_value_reconstruct_data(key, lsn_range, reconstruct_state, ctx)
.await
impl ImageLayerInner {
pub(super) async fn dump(&self) -> anyhow::Result<()> {
let file = &self.file;
let tree_reader =
DiskBtreeReader::<_, KEY_SIZE>::new(self.index_start_blk, self.index_root_blk, file);
tree_reader.dump().await?;
tree_reader
.visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| {
println!("key: {} offset {}", hex::encode(key), value);
true
})
.await?;
Ok(())
}
}
@@ -195,120 +195,21 @@ impl AsLayerDesc for ImageLayer {
}
}
impl PersistentLayer for ImageLayer {
fn local_path(&self) -> Option<PathBuf> {
self.local_path()
}
fn delete_resident_layer_file(&self) -> Result<()> {
self.delete_resident_layer_file()
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
self.info(reset)
}
fn access_stats(&self) -> &LayerAccessStats {
self.access_stats()
}
}
impl ImageLayer {
pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
println!(
"----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----",
self.desc.tenant_id,
self.desc.timeline_id,
self.desc.key_range.start,
self.desc.key_range.end,
self.lsn,
self.desc.is_incremental(),
self.desc.file_size
);
self.desc.dump();
if !verbose {
return Ok(());
}
let inner = self.load(LayerAccessKind::Dump, ctx).await?;
let file = &inner.file;
let tree_reader =
DiskBtreeReader::<_, KEY_SIZE>::new(inner.index_start_blk, inner.index_root_blk, file);
tree_reader.dump().await?;
tree_reader
.visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| {
println!("key: {} offset {}", hex::encode(key), value);
true
})
.await?;
inner.dump().await?;
Ok(())
}
pub(crate) async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
assert!(self.desc.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
let inner = self
.load(LayerAccessKind::GetValueReconstructData, ctx)
.await?;
inner
.get_value_reconstruct_data(key, reconstruct_state)
.await
// FIXME: makes no sense to dump paths
.with_context(|| format!("read {}", self.path().display()))
}
pub(crate) fn local_path(&self) -> Option<PathBuf> {
Some(self.path())
}
pub(crate) fn delete_resident_layer_file(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
Ok(())
}
pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.layer_desc().filename().file_name();
let lsn_start = self.layer_desc().image_layer_lsn();
HistoricLayerInfo::Image {
layer_file_name,
layer_file_size: self.desc.file_size,
lsn_start,
remote: false,
access_stats: self.access_stats.as_api_model(reset),
}
}
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
&self.access_stats
}
fn path_for(
path_or_conf: &PathOrConf,
timeline_id: TimelineId,
tenant_id: TenantId,
fname: &ImageFileName,
) -> PathBuf {
match path_or_conf {
PathOrConf::Path(path) => path.to_path_buf(),
PathOrConf::Conf(conf) => conf
.timeline_path(&tenant_id, &timeline_id)
.join(fname.to_string()),
}
}
fn temp_path_for(
conf: &PageServerConf,
timeline_id: TimelineId,
@@ -344,52 +245,21 @@ impl ImageLayer {
async fn load_inner(&self) -> Result<ImageLayerInner> {
let path = self.path();
let expected_summary = match &self.path_or_conf {
PathOrConf::Conf(_) => Some(Summary::from(self)),
PathOrConf::Path(_) => None,
};
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None)?;
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary)?;
// not production code
let actual_filename = self.path.file_name().unwrap().to_str().unwrap().to_owned();
let expected_filename = self.layer_desc().filename().file_name();
if let PathOrConf::Path(ref path) = self.path_or_conf {
// not production code
let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
let expected_filename = self.filename().file_name();
if actual_filename != expected_filename {
println!("warning: filename does not match what is expected from in-file summary");
println!("actual: {:?}", actual_filename);
println!("expected: {:?}", expected_filename);
}
if actual_filename != expected_filename {
println!("warning: filename does not match what is expected from in-file summary");
println!("actual: {:?}", actual_filename);
println!("expected: {:?}", expected_filename);
}
Ok(loaded)
}
/// Create an ImageLayer struct representing an existing file on disk
pub fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
filename: &ImageFileName,
file_size: u64,
access_stats: LayerAccessStats,
) -> ImageLayer {
ImageLayer {
path_or_conf: PathOrConf::Conf(conf),
desc: PersistentLayerDesc::new_img(
tenant_id,
timeline_id,
filename.key_range.clone(),
filename.lsn,
file_size,
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
lsn: filename.lsn,
access_stats,
inner: OnceCell::new(),
}
}
/// Create an ImageLayer struct representing an existing file on disk.
///
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
@@ -402,7 +272,7 @@ impl ImageLayer {
.metadata()
.context("get file metadata to determine size")?;
Ok(ImageLayer {
path_or_conf: PathOrConf::Path(path.to_path_buf()),
path: path.to_path_buf(),
desc: PersistentLayerDesc::new_img(
summary.tenant_id,
summary.timeline_id,
@@ -416,18 +286,9 @@ impl ImageLayer {
})
}
fn layer_name(&self) -> ImageFileName {
self.desc.image_file_name()
}
/// Path to the layer file in pageserver workdir.
pub fn path(&self) -> PathBuf {
Self::path_for(
&self.path_or_conf,
self.desc.timeline_id,
self.desc.tenant_id,
&self.layer_name(),
)
fn path(&self) -> PathBuf {
self.path.clone()
}
}
@@ -582,7 +443,7 @@ impl ImageLayerWriterInner {
///
/// Finish writing the image layer.
///
fn finish(self) -> anyhow::Result<ImageLayer> {
fn finish(self, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -624,33 +485,13 @@ impl ImageLayerWriterInner {
// Note: Because we open the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
// set inner.file here. The first read will have to re-open it.
let layer = ImageLayer {
path_or_conf: PathOrConf::Conf(self.conf),
desc,
lsn: self.lsn,
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: OnceCell::new(),
};
// fsync the file
file.sync_all()?;
// Rename the file to its final name
//
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
let final_path = ImageLayer::path_for(
&PathOrConf::Conf(self.conf),
self.timeline_id,
self.tenant_id,
&ImageFileName {
key_range: self.key_range.clone(),
lsn: self.lsn,
},
);
std::fs::rename(self.path, final_path)?;
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
trace!("created image layer {}", layer.path().display());
trace!("created image layer {}", layer.local_path().display());
Ok(layer)
}
@@ -716,8 +557,11 @@ impl ImageLayerWriter {
///
/// Finish writing the image layer.
///
pub fn finish(mut self) -> anyhow::Result<ImageLayer> {
self.inner.take().unwrap().finish()
pub(crate) fn finish(
mut self,
timeline: &Arc<Timeline>,
) -> anyhow::Result<super::ResidentLayer> {
self.inner.take().unwrap().finish(timeline)
}
}

View File

@@ -10,11 +10,12 @@ use crate::repository::{Key, Value};
use crate::tenant::block_io::BlockReader;
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
use crate::tenant::Timeline;
use crate::walrecord;
use anyhow::{ensure, Result};
use pageserver_api::models::InMemoryLayerInfo;
use std::collections::HashMap;
use std::sync::OnceLock;
use std::sync::{Arc, OnceLock};
use tracing::*;
use utils::{
bin_ser::BeSer,
@@ -28,7 +29,7 @@ use std::fmt::Write as _;
use std::ops::Range;
use tokio::sync::RwLock;
use super::{DeltaLayer, DeltaLayerWriter, Layer};
use super::{DeltaLayerWriter, ResidentLayer};
pub struct InMemoryLayer {
conf: &'static PageServerConf,
@@ -203,20 +204,6 @@ impl InMemoryLayer {
}
}
#[async_trait::async_trait]
impl Layer for InMemoryLayer {
async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
self.get_value_reconstruct_data(key, lsn_range, reconstruct_data, ctx)
.await
}
}
impl std::fmt::Display for InMemoryLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let end_lsn = self.end_lsn_or_max();
@@ -225,17 +212,13 @@ impl std::fmt::Display for InMemoryLayer {
}
impl InMemoryLayer {
///
/// Get layer size.
///
pub async fn size(&self) -> Result<u64> {
let inner = self.inner.read().await;
Ok(inner.file.len())
}
///
/// Create a new, empty, in-memory layer
///
pub fn create(
conf: &'static PageServerConf,
timeline_id: TimelineId,
@@ -313,7 +296,7 @@ impl InMemoryLayer {
/// Write this frozen in-memory layer to disk.
///
/// Returns a new delta layer with all the same data as this in-memory layer
pub(crate) async fn write_to_disk(&self) -> Result<DeltaLayer> {
pub(crate) async fn write_to_disk(&self, timeline: &Arc<Timeline>) -> Result<ResidentLayer> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. There's one exception
@@ -352,7 +335,7 @@ impl InMemoryLayer {
}
}
let delta_layer = delta_layer_writer.finish(Key::MAX)?;
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline)?;
Ok(delta_layer)
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,3 @@
use anyhow::Result;
use core::fmt::Display;
use std::ops::Range;
use utils::{
@@ -6,7 +5,7 @@ use utils::{
lsn::Lsn,
};
use crate::{context::RequestContext, repository::Key};
use crate::repository::Key;
use super::{DeltaFileName, ImageFileName, LayerFileName};
@@ -100,6 +99,22 @@ impl PersistentLayerDesc {
}
}
pub fn from_filename(
tenant_id: TenantId,
timeline_id: TimelineId,
filename: LayerFileName,
file_size: u64,
) -> Self {
match filename {
LayerFileName::Image(i) => {
Self::new_img(tenant_id, timeline_id, i.key_range, i.lsn, file_size)
}
LayerFileName::Delta(d) => {
Self::new_delta(tenant_id, timeline_id, d.key_range, d.lsn_range, file_size)
}
}
}
/// Get the LSN that the image layer covers.
pub fn image_layer_lsn(&self) -> Lsn {
assert!(!self.is_delta);
@@ -173,21 +188,31 @@ impl PersistentLayerDesc {
self.is_delta
}
pub fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
println!(
"----- layer for ten {} tli {} keys {}-{} lsn {}-{} is_delta {} is_incremental {} size {} ----",
self.tenant_id,
self.timeline_id,
self.key_range.start,
self.key_range.end,
self.lsn_range.start,
self.lsn_range.end,
self.is_delta,
self.is_incremental(),
self.file_size,
);
Ok(())
pub fn dump(&self) {
if self.is_delta {
println!(
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} is_incremental {} size {} ----",
self.tenant_id,
self.timeline_id,
self.key_range.start,
self.key_range.end,
self.lsn_range.start,
self.lsn_range.end,
self.is_incremental(),
self.file_size,
);
} else {
println!(
"----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----",
self.tenant_id,
self.timeline_id,
self.key_range.start,
self.key_range.end,
self.image_layer_lsn(),
self.is_incremental(),
self.file_size
);
}
}
pub fn file_size(&self) -> u64 {

View File

@@ -1,216 +0,0 @@
//! A RemoteLayer is an in-memory placeholder for a layer file that exists
//! in remote storage.
//!
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::timeline::layer_manager::LayerManager;
use anyhow::{bail, Result};
use pageserver_api::models::HistoricLayerInfo;
use std::ops::Range;
use std::path::PathBuf;
use std::sync::Arc;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use super::filename::{DeltaFileName, ImageFileName};
use super::{
AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset,
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc,
};
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
/// [`DeltaLayer`](super::DeltaLayer).
///
/// RemoteLayer might be downloaded on-demand during operations which are
/// allowed download remote layers and during which, it gets replaced with a
/// concrete `DeltaLayer` or `ImageLayer`.
///
/// See: [`crate::context::RequestContext`] for authorization to download
pub struct RemoteLayer {
pub desc: PersistentLayerDesc,
pub layer_metadata: LayerFileMetadata,
access_stats: LayerAccessStats,
pub(crate) ongoing_download: Arc<tokio::sync::Semaphore>,
/// Has `LayerMap::replace` failed for this (true) or not (false).
///
/// Used together with [`ongoing_download`] semaphore in `Timeline::download_remote_layer`.
/// The field is used to mark a RemoteLayer permanently (until restart or ignore+load)
/// unprocessable, because a LayerMap::replace failed.
///
/// It is very unlikely to accumulate these in the Timeline's LayerMap, but having this avoids
/// a possible fast loop between `Timeline::get_reconstruct_data` and
/// `Timeline::download_remote_layer`, which also logs.
///
/// [`ongoing_download`]: Self::ongoing_download
pub(crate) download_replacement_failure: std::sync::atomic::AtomicBool,
}
impl std::fmt::Debug for RemoteLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RemoteLayer")
.field("file_name", &self.desc.filename())
.field("layer_metadata", &self.layer_metadata)
.field("is_incremental", &self.desc.is_incremental())
.finish()
}
}
#[async_trait::async_trait]
impl Layer for RemoteLayer {
async fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
bail!("layer {self} needs to be downloaded");
}
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
impl std::fmt::Display for RemoteLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.layer_desc().short_id())
}
}
impl AsLayerDesc for RemoteLayer {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.desc
}
}
impl PersistentLayer for RemoteLayer {
fn local_path(&self) -> Option<PathBuf> {
None
}
fn delete_resident_layer_file(&self) -> Result<()> {
bail!("remote layer has no layer file");
}
fn downcast_remote_layer<'a>(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
Some(self)
}
fn is_remote_layer(&self) -> bool {
true
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.layer_desc().filename().file_name();
let lsn_range = self.layer_desc().lsn_range.clone();
if self.desc.is_delta {
HistoricLayerInfo::Delta {
layer_file_name,
layer_file_size: self.layer_metadata.file_size(),
lsn_start: lsn_range.start,
lsn_end: lsn_range.end,
remote: true,
access_stats: self.access_stats.as_api_model(reset),
}
} else {
HistoricLayerInfo::Image {
layer_file_name,
layer_file_size: self.layer_metadata.file_size(),
lsn_start: lsn_range.start,
remote: true,
access_stats: self.access_stats.as_api_model(reset),
}
}
}
fn access_stats(&self) -> &LayerAccessStats {
&self.access_stats
}
}
impl RemoteLayer {
pub fn new_img(
tenantid: TenantId,
timelineid: TimelineId,
fname: &ImageFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayer {
RemoteLayer {
desc: PersistentLayerDesc::new_img(
tenantid,
timelineid,
fname.key_range.clone(),
fname.lsn,
layer_metadata.file_size(),
),
layer_metadata: layer_metadata.clone(),
ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)),
download_replacement_failure: std::sync::atomic::AtomicBool::default(),
access_stats,
}
}
pub fn new_delta(
tenantid: TenantId,
timelineid: TimelineId,
fname: &DeltaFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayer {
RemoteLayer {
desc: PersistentLayerDesc::new_delta(
tenantid,
timelineid,
fname.key_range.clone(),
fname.lsn_range.clone(),
layer_metadata.file_size(),
),
layer_metadata: layer_metadata.clone(),
ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)),
download_replacement_failure: std::sync::atomic::AtomicBool::default(),
access_stats,
}
}
/// Create a Layer struct representing this layer, after it has been downloaded.
pub(crate) fn create_downloaded_layer(
&self,
_layer_map_lock_held_witness: &LayerManager,
conf: &'static PageServerConf,
file_size: u64,
) -> Arc<dyn PersistentLayer> {
if self.desc.is_delta {
let fname = self.desc.delta_file_name();
Arc::new(DeltaLayer::new(
conf,
self.desc.timeline_id,
self.desc.tenant_id,
&fname,
file_size,
self.access_stats
.clone_for_residence_change(LayerResidenceStatus::Resident),
))
} else {
let fname = self.desc.image_file_name();
Arc::new(ImageLayer::new(
conf,
self.desc.timeline_id,
self.desc.tenant_id,
&fname,
file_size,
self.access_stats
.clone_for_residence_change(LayerResidenceStatus::Resident),
))
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -29,7 +29,6 @@ use crate::{
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
storage_layer::PersistentLayer,
timeline::EvictionError,
LogicalSizeCalculationCause, Tenant,
},
@@ -194,15 +193,26 @@ impl Timeline {
// NB: all the checks can be invalidated as soon as we release the layer map lock.
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let candidates: Vec<_> = {
let guard = self.layers.read().await;
let layers = guard.layer_map();
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
let hist_layer = guard.get_from_desc(&hist_layer);
if hist_layer.is_remote_layer() {
continue;
}
// guard against eviction while we inspect it; it might be that eviction_task and
// disk_usage_eviction_task both select the same layers to be evicted, and
// seemingly free up double the space. both succeeding is of no consequence.
let guard = match hist_layer.keep_resident().await {
Ok(Some(l)) => l,
Ok(None) => continue,
Err(e) => {
// these should not happen, but we cannot make them statically impossible right
// now.
tracing::warn!(layer=%hist_layer, "failed to keep the layer resident: {e:#}");
continue;
}
};
let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
// We only use this fallback if there's an implementation error.
@@ -233,7 +243,7 @@ impl Timeline {
}
};
if no_activity_for > p.threshold {
candidates.push(hist_layer)
candidates.push(guard.drop_eviction_guard())
}
}
candidates
@@ -252,7 +262,7 @@ impl Timeline {
};
let results = match self
.evict_layer_batch(remote_client, &candidates[..], cancel.clone())
.evict_layer_batch(remote_client, &candidates, cancel)
.await
{
Err(pre_err) => {
@@ -263,7 +273,7 @@ impl Timeline {
Ok(results) => results,
};
assert_eq!(results.len(), candidates.len());
for (l, result) in candidates.iter().zip(results) {
for result in results {
match result {
None => {
stats.skipped_for_shutdown += 1;
@@ -271,20 +281,10 @@ impl Timeline {
Some(Ok(())) => {
stats.evicted += 1;
}
Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
stats.not_evictable += 1;
}
Some(Err(EvictionError::FileNotFound)) => {
Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
// compaction/gc removed the file while we were waiting on layer_removal_cs
stats.not_evictable += 1;
}
Some(Err(
e @ EvictionError::LayerNotFound(_) | e @ EvictionError::StatFailed(_),
)) => {
let e = utils::error::report_compact_sources(&e);
warn!(layer = %l, "failed to evict layer: {e}");
stats.not_evictable += 1;
}
}
}
if stats.candidates == stats.not_evictable {

View File

@@ -8,21 +8,19 @@ use utils::{
use crate::{
config::PageServerConf,
metrics::TimelineMetrics,
tenant::{
layer_map::{BatchedUpdates, LayerMap},
storage_layer::{
AsLayerDesc, DeltaLayer, ImageLayer, InMemoryLayer, PersistentLayer,
PersistentLayerDesc, PersistentLayerKey,
AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
ResidentLayer,
},
timeline::compare_arced_layers,
},
};
/// Provides semantic APIs to manipulate the layer map.
pub(crate) struct LayerManager {
layer_map: LayerMap,
layer_fmgr: LayerFileManager,
layer_fmgr: LayerFileManager<Layer>,
}
/// After GC, the layer map changes will not be applied immediately. Users should manually apply the changes after
@@ -43,7 +41,7 @@ impl LayerManager {
}
}
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
self.layer_fmgr.get_from_desc(desc)
}
@@ -55,21 +53,12 @@ impl LayerManager {
&self.layer_map
}
/// Replace layers in the layer file manager, used in evictions and layer downloads.
pub(crate) fn replace_and_verify(
&mut self,
expected: Arc<dyn PersistentLayer>,
new: Arc<dyn PersistentLayer>,
) -> Result<()> {
self.layer_fmgr.replace_and_verify(expected, new)
}
/// Called from `load_layer_map`. Initialize the layer manager with:
/// 1. all on-disk layers
/// 2. next open layer (with disk disk_consistent_lsn LSN)
pub(crate) fn initialize_local_layers(
&mut self,
on_disk_layers: Vec<Arc<dyn PersistentLayer>>,
on_disk_layers: Vec<Layer>,
next_open_layer_at: Lsn,
) {
let mut updates = self.layer_map.batch_update();
@@ -164,10 +153,10 @@ impl LayerManager {
}
/// Add image layers to the layer map, called from `create_image_layers`.
pub(crate) fn track_new_image_layers(&mut self, image_layers: Vec<ImageLayer>) {
pub(crate) fn track_new_image_layers(&mut self, image_layers: &[ResidentLayer]) {
let mut updates = self.layer_map.batch_update();
for layer in image_layers {
Self::insert_historic_layer(Arc::new(layer), &mut updates, &mut self.layer_fmgr);
Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
}
updates.flush();
}
@@ -175,46 +164,47 @@ impl LayerManager {
/// Flush a frozen layer and add the written delta layer to the layer map.
pub(crate) fn finish_flush_l0_layer(
&mut self,
delta_layer: Option<DeltaLayer>,
delta_layer: Option<&ResidentLayer>,
frozen_layer_for_check: &Arc<InMemoryLayer>,
) {
let l = self.layer_map.frozen_layers.pop_front();
let mut updates = self.layer_map.batch_update();
let inmem = self
.layer_map
.frozen_layers
.pop_front()
.expect("there must be a inmem layer to flush");
// Only one thread may call this function at a time (for this
// timeline). If two threads tried to flush the same frozen
// Only one task may call this function at a time (for this
// timeline). If two tasks tried to flush the same frozen
// layer to disk at the same time, that would not work.
assert!(compare_arced_layers(&l.unwrap(), frozen_layer_for_check));
assert_eq!(Arc::as_ptr(&inmem), Arc::as_ptr(frozen_layer_for_check));
if let Some(delta_layer) = delta_layer {
Self::insert_historic_layer(Arc::new(delta_layer), &mut updates, &mut self.layer_fmgr);
if let Some(l) = delta_layer {
let mut updates = self.layer_map.batch_update();
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
updates.flush();
}
updates.flush();
}
/// Called when compaction is completed.
pub(crate) fn finish_compact_l0(
&mut self,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
compact_from: Vec<Arc<dyn PersistentLayer>>,
compact_to: Vec<Arc<dyn PersistentLayer>>,
metrics: &TimelineMetrics,
layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
compact_from: Vec<Layer>,
compact_to: &[ResidentLayer],
duplicates: &[(ResidentLayer, ResidentLayer)],
) -> Result<()> {
let mut updates = self.layer_map.batch_update();
for l in compact_to {
Self::insert_historic_layer(l, &mut updates, &mut self.layer_fmgr);
Self::insert_historic_layer(l.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
}
for l in compact_from {
// NB: the layer file identified by descriptor `l` is guaranteed to be present
// in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire
// time, even though we dropped `Timeline::layers` inbetween.
Self::delete_historic_layer(
layer_removal_cs.clone(),
l,
&mut updates,
metrics,
&mut self.layer_fmgr,
)?;
Self::delete_historic_layer(layer_removal_cs, l, &mut updates, &mut self.layer_fmgr)?;
}
for (old, new) in duplicates {
self.layer_fmgr.replace(old.as_ref(), new.as_ref().clone());
}
updates.flush();
Ok(())
@@ -223,28 +213,26 @@ impl LayerManager {
/// Called when garbage collect the timeline. Returns a guard that will apply the updates to the layer map.
pub(crate) fn finish_gc_timeline(
&mut self,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
gc_layers: Vec<Arc<dyn PersistentLayer>>,
metrics: &TimelineMetrics,
layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
gc_layers: Vec<Layer>,
) -> Result<ApplyGcResultGuard> {
let mut updates = self.layer_map.batch_update();
for doomed_layer in gc_layers {
Self::delete_historic_layer(
layer_removal_cs.clone(),
layer_removal_cs,
doomed_layer,
&mut updates,
metrics,
&mut self.layer_fmgr,
)?; // FIXME: schedule succeeded deletions in timeline.rs `gc_timeline` instead of in batch?
)?;
}
Ok(ApplyGcResultGuard(updates))
}
/// Helper function to insert a layer into the layer map and file manager.
fn insert_historic_layer(
layer: Arc<dyn PersistentLayer>,
layer: Layer,
updates: &mut BatchedUpdates<'_>,
mapping: &mut LayerFileManager,
mapping: &mut LayerFileManager<Layer>,
) {
updates.insert_historic(layer.layer_desc().clone());
mapping.insert(layer);
@@ -254,17 +242,12 @@ impl LayerManager {
/// Remote storage is not affected by this operation.
fn delete_historic_layer(
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
layer: Arc<dyn PersistentLayer>,
_layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
layer: Layer,
updates: &mut BatchedUpdates<'_>,
metrics: &TimelineMetrics,
mapping: &mut LayerFileManager,
mapping: &mut LayerFileManager<Layer>,
) -> anyhow::Result<()> {
let desc = layer.layer_desc();
if !layer.is_remote_layer() {
layer.delete_resident_layer_file()?;
metrics.resident_physical_size_gauge.sub(desc.file_size);
}
// TODO Removing from the bottom of the layer map is expensive.
// Maybe instead discard all layer map historic versions that
@@ -272,22 +255,21 @@ impl LayerManager {
// and mark what we can't delete yet as deleted from the layer
// map index without actually rebuilding the index.
updates.remove_historic(desc);
mapping.remove(layer);
mapping.remove(&layer);
layer.garbage_collect_on_drop();
Ok(())
}
pub(crate) fn contains(&self, layer: &Arc<dyn PersistentLayer>) -> bool {
pub(crate) fn contains(&self, layer: &Layer) -> bool {
self.layer_fmgr.contains(layer)
}
}
pub(crate) struct LayerFileManager<T: AsLayerDesc + ?Sized = dyn PersistentLayer>(
HashMap<PersistentLayerKey, Arc<T>>,
);
pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<T> {
impl<T: AsLayerDesc + Clone + PartialEq + std::fmt::Debug> LayerFileManager<T> {
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
// The assumption for the `expect()` is that all code maintains the following invariant:
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
self.0
@@ -297,14 +279,14 @@ impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
.clone()
}
pub(crate) fn insert(&mut self, layer: Arc<T>) {
pub(crate) fn insert(&mut self, layer: T) {
let present = self.0.insert(layer.layer_desc().key(), layer.clone());
if present.is_some() && cfg!(debug_assertions) {
panic!("overwriting a layer: {:?}", layer.layer_desc())
}
}
pub(crate) fn contains(&self, layer: &Arc<T>) -> bool {
pub(crate) fn contains(&self, layer: &T) -> bool {
self.0.contains_key(&layer.layer_desc().key())
}
@@ -312,7 +294,7 @@ impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
Self(HashMap::new())
}
pub(crate) fn remove(&mut self, layer: Arc<T>) {
pub(crate) fn remove(&mut self, layer: &T) {
let present = self.0.remove(&layer.layer_desc().key());
if present.is_none() && cfg!(debug_assertions) {
panic!(
@@ -322,38 +304,13 @@ impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
}
}
pub(crate) fn replace_and_verify(&mut self, expected: Arc<T>, new: Arc<T>) -> Result<()> {
let key = expected.layer_desc().key();
let other = new.layer_desc().key();
pub(crate) fn replace(&mut self, old: &T, new: T) {
let key = old.layer_desc().key();
assert_eq!(key, new.layer_desc().key());
let expected_l0 = LayerMap::is_l0(expected.layer_desc());
let new_l0 = LayerMap::is_l0(new.layer_desc());
fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
"layermap-replace-notfound"
));
anyhow::ensure!(
key == other,
"expected and new layer have different keys: {key:?} != {other:?}"
);
anyhow::ensure!(
expected_l0 == new_l0,
"one layer is l0 while the other is not: {expected_l0} != {new_l0}"
);
if let Some(layer) = self.0.get_mut(&key) {
anyhow::ensure!(
compare_arced_layers(&expected, layer),
"another layer was found instead of expected, expected={expected:?}, new={new:?}",
expected = Arc::as_ptr(&expected),
new = Arc::as_ptr(layer),
);
*layer = new;
Ok(())
} else {
anyhow::bail!("layer was not found");
if let Some(existing) = self.0.get_mut(&key) {
assert_eq!(existing, old);
*existing = new;
}
}
}

View File

@@ -1,6 +1,7 @@
use crate::metrics::RemoteOpFileKind;
use super::storage_layer::LayerFileName;
use super::storage_layer::ResidentLayer;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::remote_timeline_client::index::IndexPart;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
@@ -210,7 +211,7 @@ pub(crate) struct Delete {
#[derive(Debug)]
pub(crate) enum UploadOp {
/// Upload a layer file
UploadLayer(LayerFileName, LayerFileMetadata),
UploadLayer(ResidentLayer, LayerFileMetadata),
/// Upload the metadata file
UploadMetadata(IndexPart, Lsn),
@@ -225,13 +226,8 @@ pub(crate) enum UploadOp {
impl std::fmt::Display for UploadOp {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
UploadOp::UploadLayer(path, metadata) => {
write!(
f,
"UploadLayer({}, size={:?})",
path.file_name(),
metadata.file_size()
)
UploadOp::UploadLayer(layer, metadata) => {
write!(f, "UploadLayer({}, size={:?})", layer, metadata.file_size())
}
UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn),
UploadOp::Delete(delete) => write!(

View File

@@ -1524,7 +1524,7 @@ class NeonPageserver(PgProtocol):
".*wait for layer upload ops to complete.*", # .*Caused by:.*wait_completion aborted because upload queue was stopped
".*gc_loop.*Gc failed, retrying in.*timeline is Stopping", # When gc checks timeline state after acquiring layer_removal_cs
".*gc_loop.*Gc failed, retrying in.*: Cannot run GC iteration on inactive tenant", # Tenant::gc precondition
".*compaction_loop.*Compaction failed, retrying in.*timeline is Stopping", # When compaction checks timeline state after acquiring layer_removal_cs
".*compaction_loop.*Compaction failed, retrying in.*timeline or pageserver is shutting down", # When compaction checks timeline state after acquiring layer_removal_cs
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
".*task iteration took longer than the configured period.*",

View File

@@ -22,7 +22,7 @@ def positive_env(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
# eviction might be the first one after an attach to access the layers
env.pageserver.allowed_errors.append(
".*unexpectedly on-demand downloading remote layer remote.* for task kind Eviction"
".*unexpectedly on-demand downloading remote layer .* for task kind Eviction"
)
assert isinstance(env.remote_storage, LocalFsStorage)
return env

View File

@@ -15,7 +15,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
env.pageserver.allowed_errors.extend(
[
".*Failed to load delta layer.*",
".*layer loading failed:.*",
".*could not find data for key.*",
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
@@ -99,7 +99,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
# Third timeline will also fail during basebackup, because the layer file is corrupt.
# It will fail when we try to read (and reconstruct) a page from it, ergo the error message.
# (We don't check layer file contents on startup, when loading the timeline)
with pytest.raises(Exception, match="Failed to load delta layer") as err:
with pytest.raises(Exception, match="layer loading failed:") as err:
pg3.start()
log.info(
f"As expected, compute startup failed for timeline {tenant3}/{timeline3} with corrupt layers: {err}"

View File

@@ -2,35 +2,137 @@ import time
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
from fixtures.pageserver.utils import wait_for_upload_queue_empty
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from requests.exceptions import ConnectionError
# Test duplicate layer detection
#
# This test sets fail point at the end of first compaction phase:
# after flushing new L1 layers but before deletion of L0 layers
# it should cause generation of duplicate L1 layer by compaction after restart.
@pytest.mark.timeout(600)
def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
def test_compaction_duplicates_all(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"""
Makes compact_level0_phase1 return input layers as the output layers with a
failpoint as if those L0 inputs would had all been recreated when L1s were
supposed to be created.
"""
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
test_name="test_compaction_duplicates_all",
)
# Use aggressive compaction and checkpoint settings
tenant_id, _ = env.neon_cli.create_tenant(
conf={
env = neon_env_builder.init_start(
initial_tenant_conf={
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
"compaction_period": "5 s",
"compaction_period": "0 s",
"compaction_threshold": "3",
}
)
pageserver_http = env.pageserver.http_client()
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
pageserver_http.configure_failpoints(("compact-level0-phase1-return-same", "return"))
# pageserver_http.configure_failpoints(("after-timeline-compacted-first-L1", "exit"))
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
connstr = endpoint.connstr(options="-csynchronous_commit=off")
pg_bin.run_capture(["pgbench", "-i", "-s1", connstr])
time.sleep(10) # let compaction to be performed
pageserver_http.timeline_compact(tenant_id, timeline_id)
assert env.pageserver.log_contains("compact-level0-phase1-return-same")
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T200", "-Mprepared", connstr])
def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"""
This test sets fail point at the end of first compaction phase:
after flushing new L1 layers but before deletion of L0 layers
it should cause generation of duplicate L1 layer by compaction after restart.
"""
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
test_name="test_duplicate_layers",
)
env = neon_env_builder.init_start(
initial_tenant_conf={
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
"compaction_period": "0 s",
"compaction_threshold": "3",
}
)
pageserver_http = env.pageserver.http_client()
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
pageserver_http.configure_failpoints(("after-timeline-compacted-first-L1", "exit"))
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
connstr = endpoint.connstr(options="-csynchronous_commit=off")
pg_bin.run_capture(["pgbench", "-i", "-s1", connstr])
with pytest.raises(ConnectionError, match="Remote end closed connection without response"):
pageserver_http.timeline_compact(tenant_id, timeline_id)
# pageserver has already exited at this point
env.pageserver.stop()
# now the duplicate L1 has been created, but is not yet uploaded
assert isinstance(env.remote_storage, LocalFsStorage)
# path = env.remote_storage.timeline_path(tenant_id, timeline_id)
l1_found = None
for path in env.timeline_dir(tenant_id, timeline_id).iterdir():
if path.name == "metadata" or path.name.startswith("ephemeral-"):
continue
if len(path.suffixes) > 0:
# temp files
continue
[key_range, lsn_range] = path.name.split("__", maxsplit=1)
if "-" not in lsn_range:
# image layer
continue
[key_start, key_end] = key_range.split("-", maxsplit=1)
if key_start == "0" * 36 and key_end == "F" * 36:
# L0
continue
assert l1_found is None, f"found multiple L1: {l1_found.name} and {path.name}"
l1_found = path
assert l1_found is not None, "failed to find L1 locally"
original_created_at = l1_found.stat()[8]
uploaded = env.remote_storage.timeline_path(tenant_id, timeline_id) / l1_found.name
assert not uploaded.exists(), "to-be-overwritten should not yet be uploaded"
# give room for fs timestamps
time.sleep(1)
env.pageserver.start()
warning = f".*duplicated L1 layer layer={l1_found.name}"
env.pageserver.allowed_errors.append(warning)
pageserver_http.timeline_compact(tenant_id, timeline_id)
# give time for log flush
time.sleep(1)
env.pageserver.log_contains(warning)
overwritten_at = l1_found.stat()[8]
assert original_created_at < overwritten_at, "expected the L1 to be overwritten"
wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id)
uploaded_at = uploaded.stat()[8]
assert overwritten_at <= uploaded_at, "expected the L1 to finally be uploaded"
# why does compaction not wait for uploads? probably so that we can compact
# faster than we can upload in some cases.
#
# timeline_compact should wait for uploads as well

View File

@@ -256,34 +256,34 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
ps_http.evict_all_layers(tenant_id, timeline_id)
def ensure_resident_and_remote_size_metrics():
log.info("ensure that all the layers are gone")
resident_layers = list(env.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
# we have disabled all background loops, so, this should hold
assert len(resident_layers) == 0
assert len(resident_layers) == 0, "ensure that all the layers are gone"
info = ps_http.layer_map_info(tenant_id, timeline_id)
log.info("layer map dump: %s", info)
log.info("ensure that resident_physical_size metric is zero")
resident_physical_size_metric = ps_http.get_timeline_metric(
tenant_id, timeline_id, "pageserver_resident_physical_size"
)
assert resident_physical_size_metric == 0
log.info("ensure that resident_physical_size metric corresponds to layer map dump")
assert (
resident_physical_size_metric == 0
), "ensure that resident_physical_size metric is zero"
assert resident_physical_size_metric == sum(
[layer.layer_file_size or 0 for layer in info.historic_layers if not layer.remote]
)
layer.layer_file_size or 0 for layer in info.historic_layers if not layer.remote
), "ensure that resident_physical_size metric corresponds to layer map dump"
log.info("ensure that remote_physical_size metric matches layer map")
remote_physical_size_metric = ps_http.get_timeline_metric(
tenant_id, timeline_id, "pageserver_remote_physical_size"
)
log.info("ensure that remote_physical_size metric corresponds to layer map dump")
assert remote_physical_size_metric == sum(
layer.layer_file_size or 0 for layer in info.historic_layers if layer.remote
)
), "ensure that remote_physical_size metric corresponds to layer map dump"
log.info("before runnning GC, ensure that remote_physical size is zero")
# leaving index_part.json upload from successful compaction out will show
# up here as a mismatch between remove_physical_size and summed up layermap
# size
ensure_resident_and_remote_size_metrics()
log.info("run GC")

View File

@@ -13,13 +13,12 @@ from fixtures.neon_fixtures import (
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import (
assert_tenant_state,
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
wait_until_tenant_state,
)
from fixtures.remote_storage import RemoteStorageKind, available_remote_storages
from fixtures.types import Lsn
@@ -391,7 +390,7 @@ def test_download_remote_layers_api(
env.pageserver.start(extra_env_vars={"FAILPOINTS": "remote-storage-download-pre-rename=return"})
env.pageserver.allowed_errors.extend(
[
f".*download_all_remote_layers.*{tenant_id}.*{timeline_id}.*layer download failed.*remote-storage-download-pre-rename failpoint",
".*download failed: downloading evicted layer file failed.*",
f".*initial size calculation.*{tenant_id}.*{timeline_id}.*Failed to calculate logical size",
]
)
@@ -658,62 +657,5 @@ def test_compaction_downloads_on_demand_with_image_creation(
assert dict(kinds_after) == {"Delta": 4, "Image": 1}
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_ondemand_download_failure_to_replace(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
"""
Make sure that we fail on being unable to replace a RemoteLayer instead of for example livelocking.
See: https://github.com/neondatabase/neon/issues/3533
"""
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_ondemand_download_failure_to_replace",
)
# disable gc and compaction via default tenant config because config is lost while detaching
# so that compaction will not be the one to download the layer but the http handler is
neon_env_builder.pageserver_config_override = (
"""tenant_config={gc_period = "0s", compaction_period = "0s"}"""
)
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
assert timeline_id is not None
pageserver_http = env.pageserver.http_client()
# remove layers so that they will be redownloaded
pageserver_http.tenant_detach(tenant_id)
pageserver_http.tenant_attach(tenant_id)
wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5)
pageserver_http.configure_failpoints(("layermap-replace-notfound", "return"))
# requesting details with non-incremental size should trigger a download of the only layer
# this will need to be adjusted if an index for logical sizes is ever implemented
with pytest.raises(PageserverApiException):
# PageserverApiException is expected because of the failpoint (timeline_detail building does something)
# ReadTimeout can happen on our busy CI, but it should not, because there is no more busylooping
# but should it be added back, we would wait for 15s here.
pageserver_http.timeline_detail(tenant_id, timeline_id, True, timeout=15)
actual_message = ".* ERROR .*layermap-replace-notfound"
assert env.pageserver.log_contains(actual_message) is not None
env.pageserver.allowed_errors.append(actual_message)
env.pageserver.allowed_errors.append(
".* ERROR .*Error processing HTTP request: InternalServerError\\(get local timeline info"
)
# this might get to run and attempt on-demand, but not always
env.pageserver.allowed_errors.append(".* ERROR .*Task 'initial size calculation'")
# if the above returned, then we didn't have a livelock, and all is well
def stringify(conf: Dict[str, Any]) -> Dict[str, str]:
return dict(map(lambda x: (x[0], str(x[1])), conf.items()))

View File

@@ -603,7 +603,7 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
log.info("sending delete request")
checkpoint_allowed_to_fail.set()
env.pageserver.allowed_errors.append(
".* ERROR .*Error processing HTTP request: InternalServerError\\(timeline is Stopping"
".* ERROR .*Error processing HTTP request: InternalServerError\\(The timeline or pageserver is shutting down"
)
# Generous timeout, because currently deletions can get blocked waiting for compaction
@@ -861,10 +861,8 @@ def test_compaction_delete_before_upload(
# Ensure that this actually terminates
wait_upload_queue_empty(client, tenant_id, timeline_id)
# For now we are hitting this message.
# Maybe in the future the underlying race condition will be fixed,
# but until then, ensure that this message is hit instead.
assert env.pageserver.log_contains(
# fixed in #4938
assert not env.pageserver.log_contains(
"File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."
)

View File

@@ -688,7 +688,7 @@ def test_ignored_tenant_stays_broken_without_metadata(
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Broken.*"
f".*Tenant {tenant_id} will not become active\\. Current state: (Stopping|Broken).*"
)
# ignore the tenant and remove its metadata

View File

@@ -239,9 +239,7 @@ def test_tenant_redownloads_truncated_file_on_startup(
assert isinstance(env.remote_storage, LocalFsStorage)
env.pageserver.allowed_errors.append(
".*removing local file .* because it has unexpected length.*"
)
env.pageserver.allowed_errors.append(".*removing local file .* because .*")
# FIXME: Are these expected?
env.pageserver.allowed_errors.append(