Compare commits

..

12 Commits

Author SHA1 Message Date
Arpad Müller
a3ca7eeb23 Set set_launch_timestamp_metric also in safekeepers 2024-08-06 16:23:10 +02:00
a-masterov
078f941dc8 Add a test using Debezium as a client for the logical replication (#8568)
## Problem
We need to test the logical replication with some external consumers.
## Summary of changes
A test of the logical replication with Debezium as a consumer was added.
---------

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2024-08-06 13:08:55 +02:00
Arseny Sher
68bcbf8227 Add package-mode=false to poetry.
We don't use it for packaging, and 'poetry install' will soon error
otherwise. Also remove name and version fields as these are not required for
non-packaging mode.
2024-08-06 13:53:23 +03:00
Arpad Müller
a31c95cb40 storage_scrubber: migrate scan_safekeeper_metadata to remote_storage (#8595)
Migrates the safekeeper-specific parts of `ScanMetadata` to
GenericRemoteStorage, making it Azure-ready.
 
Part of https://github.com/neondatabase/neon/issues/7547
2024-08-06 10:51:39 +00:00
Joonas Koivunen
dc7eb5ae5a chore: bump index part version (#8611)
#8600 missed the hunk changing index_part.json informative version.
Include it in this PR, in addition add more non-warning index_part.json
versions to scrubber.
2024-08-06 11:45:41 +01:00
Vlad Lazar
44fedfd6c3 pageserver: remove legacy read path (#8601)
## Problem

We have been maintaining two read paths (legacy and vectored) for a
while now. The legacy read-path was only used for cross validation in some tests.

## Summary of changes
* Tweak all tests that were using the legacy read path to use the
vectored read path instead
* Remove the read path dispatching based on the pageserver configs
* Remove the legacy read path code

We will be able to remove the single blob io code in
`pageserver/src/tenant/blob_io.rs` when https://github.com/neondatabase/neon/issues/7386 is complete.

Closes https://github.com/neondatabase/neon/issues/8005
2024-08-06 10:14:01 +01:00
Joonas Koivunen
138f008bab feat: persistent gc blocking (#8600)
Currently, we do not have facilities to persistently block GC on a
tenant for whatever reason. We could do a tenant configuration update,
but that is risky for generation numbers and would also be transient.
Introduce a `gc_block` facility in the tenant, which manages per
timeline blocking reasons.

Additionally, add HTTP endpoints for enabling/disabling manual gc
blocking for a specific timeline. For debugging, individual tenant
status now includes a similar string representation logged when GC is
skipped.

Cc: #6994
2024-08-06 10:09:56 +01:00
Joonas Koivunen
6a6f30e378 fix: make Timeline::set_disk_consistent_lsn use fetch_max (#8311)
now it is safe to use from multiple callers, as we have two callers.
2024-08-06 08:52:01 +01:00
Alex Chi Z.
8f3bc5ae35 feat(pageserver): support dry-run for gc-compaction, add statistics (#8557)
Add dry-run mode that does not produce any image layer + delta layer. I
will use this code to do some experiments and see how much space we can
reclaim for tenants on staging. Part of
https://github.com/neondatabase/neon/issues/8002

* Add dry-run mode that runs the full compaction process without
updating the layer map. (We never call finish on the writers and the
files will be removed before exiting the function).
* Add compaction statistics and print them at the end of compaction.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-08-06 02:07:48 +00:00
Alexander Bayandin
e6e578821b CI(benchmarking): set pub/sub projects for LR tests (#8483)
## Problem

> Currently, long-running LR tests recreate endpoints every night. We'd
like to have along-running buildup of history to exercise the pageserver
in this case (instead of "unit-testing" the same behavior everynight).

Closes #8317

## Summary of changes
- Update Postgres version for replication tests
- Set `BENCHMARK_PROJECT_ID_PUB`/`BENCHMARK_PROJECT_ID_SUB` env vars to
projects that were created for this purpose

---------

Co-authored-by: Sasha Krassovsky <krassovskysasha@gmail.com>
2024-08-05 22:06:47 +00:00
Joonas Koivunen
c32807ac19 fix: allow awaiting logical size for root timelines (#8604)
Currently if `GET
/v1/tenant/x/timeline/y?force-await-initial-logical-size=true` is
requested for a root timeline created within the current pageserver
session, the request handler panics hitting the debug assertion. These
timelines will always have an accurate (at initdb import) calculated
logical size. Fix is to never attempt prioritizing timeline size
calculation if we already have an exact value.

Split off from #8528.
2024-08-05 21:21:33 +01:00
Alexander Bayandin
50daff9655 CI(trigger-e2e-tests): fix deadlock with Build and Test workflow (#8606)
## Problem

In some cases, a deadlock between `build-and-test` and
`trigger-e2e-tests` workflows can happen:

```
Build and Test

Canceling since a deadlock for concurrency group 'Build and Test-8600/merge-anysha' was detected between 'top level workflow' and 'trigger-e2e-tests'
```

I don't understand the reason completely, probably `${{ github.workflow
}}` got evaluated to the same value and somehow caused the issue.
We don't need to limit concurrency for `trigger-e2e-tests`
workflow.

See
https://neondb.slack.com/archives/C059ZC138NR/p1722869486708179?thread_ts=1722869027.960029&cid=C059ZC138NR
2024-08-05 19:47:59 +01:00
43 changed files with 1446 additions and 929 deletions

View File

@@ -8,6 +8,8 @@ self-hosted-runner:
- small-arm64
- us-east-2
config-variables:
- BENCHMARK_PROJECT_ID_PUB
- BENCHMARK_PROJECT_ID_SUB
- REMOTE_STORAGE_AZURE_CONTAINER
- REMOTE_STORAGE_AZURE_REGION
- SLACK_UPCOMING_RELEASE_CHANNEL_ID

View File

@@ -147,7 +147,7 @@ jobs:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 14
DEFAULT_PG_VERSION: 16
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
@@ -168,7 +168,7 @@ jobs:
path: /tmp/neon/
prefix: latest
- name: Run benchmark
- name: Run Logical Replication benchmarks
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
@@ -176,12 +176,15 @@ jobs:
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 5400
pg_version: ${{ env.DEFAULT_PG_VERSION }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
BENCHMARK_PROJECT_ID_PUB: ${{ vars.BENCHMARK_PROJECT_ID_PUB }}
BENCHMARK_PROJECT_ID_SUB: ${{ vars.BENCHMARK_PROJECT_ID_SUB }}
- name: Run benchmark
- name: Run Physical Replication benchmarks
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}

View File

@@ -66,7 +66,31 @@ jobs:
ports:
- 9000:9000
- 8123:8123
zookeeper:
image: quay.io/debezium/zookeeper:2.7
ports:
- 2181:2181
kafka:
image: quay.io/debezium/kafka:2.7
env:
ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- 9092:9092
debezium:
image: quay.io/debezium/connect:2.7
env:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: debezium-config
OFFSET_STORAGE_TOPIC: debezium-offset
STATUS_STORAGE_TOPIC: debezium-status
DEBEZIUM_CONFIG_CONNECTOR_CLASS: io.debezium.connector.postgresql.PostgresConnector
ports:
- 8083:8083
steps:
- uses: actions/checkout@v4

View File

@@ -10,10 +10,6 @@ defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
env:
# A concurrency group that we use for e2e-tests runs, matches `concurrency.group` above with `github.repository` as a prefix
E2E_CONCURRENCY_GROUP: ${{ github.repository }}-e2e-tests-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}

View File

@@ -637,6 +637,13 @@ pub struct TenantInfo {
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
pub attachment_status: TenantAttachmentStatus,
pub generation: u32,
/// Opaque explanation if gc is being blocked.
///
/// Only looked up for the individual tenant detail, not the listing. This is purely for
/// debugging, not included in openapi.
#[serde(skip_serializing_if = "Option::is_none")]
pub gc_blocking: Option<String>,
}
#[derive(Serialize, Deserialize, Clone)]
@@ -1427,6 +1434,7 @@ mod tests {
current_physical_size: Some(42),
attachment_status: TenantAttachmentStatus::Attached,
generation: 1,
gc_blocking: None,
};
let expected_active = json!({
"id": original_active.id.to_string(),
@@ -1449,6 +1457,7 @@ mod tests {
current_physical_size: Some(42),
attachment_status: TenantAttachmentStatus::Attached,
generation: 1,
gc_blocking: None,
};
let expected_broken = json!({
"id": original_broken.id.to_string(),

View File

@@ -203,9 +203,8 @@ pub const XLR_BLOCK_ID_DATA_LONG: u8 = 254;
pub const XLR_BLOCK_ID_ORIGIN: u8 = 253;
pub const XLR_BLOCK_ID_TOPLEVEL_XID: u8 = 252;
pub const BKPBLOCK_FORK_MASK: u8 = 0x07;
pub const BKPBLOCK_FLAG_MASK: u8 = 0xF8;
pub const BKPBLOCK_OPAQUE: u8 = 0x08; /* page has no page header */
pub const BKPBLOCK_FORK_MASK: u8 = 0x0F;
pub const _BKPBLOCK_FLAG_MASK: u8 = 0xF0;
pub const BKPBLOCK_HAS_IMAGE: u8 = 0x10; /* block data is an XLogRecordBlockImage */
pub const BKPBLOCK_HAS_DATA: u8 = 0x20;
pub const BKPBLOCK_WILL_INIT: u8 = 0x40; /* redo will re-init the page */

View File

@@ -308,6 +308,45 @@ paths:
application/json:
schema:
type: string
/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/block_gc:
parameters:
- name: tenant_shard_id
in: path
required: true
schema:
type: string
- name: timeline_id
in: path
required: true
schema:
type: string
format: hex
post:
description: Persistently add a gc blocking at the tenant level because of this timeline
responses:
"200":
description: OK
/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/unblock_gc:
parameters:
- name: tenant_shard_id
in: path
required: true
schema:
type: string
- name: timeline_id
in: path
required: true
schema:
type: string
format: hex
post:
description: Persistently remove a tenant level gc blocking for this timeline
responses:
"200":
description: OK
/v1/tenant/{tenant_shard_id}/location_config:
parameters:
- name: tenant_shard_id

View File

@@ -935,6 +935,7 @@ async fn tenant_list_handler(
generation: (*gen)
.into()
.expect("Tenants are always attached with a generation"),
gc_blocking: None,
})
.collect::<Vec<TenantInfo>>();
@@ -986,6 +987,7 @@ async fn tenant_status(
.generation()
.into()
.expect("Tenants are always attached with a generation"),
gc_blocking: tenant.gc_block.summary().map(|x| format!("{x:?}")),
},
walredo: tenant.wal_redo_manager_status(),
timelines: tenant.list_timeline_ids(),
@@ -1226,6 +1228,72 @@ async fn evict_timeline_layer_handler(
}
}
async fn timeline_gc_blocking_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
block_or_unblock_gc(request, true).await
}
async fn timeline_gc_unblocking_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
block_or_unblock_gc(request, false).await
}
/// Adding a block is `POST ../block_gc`, removing a block is `POST ../unblock_gc`.
///
/// Both are technically unsafe because they might fire off index uploads, thus they are POST.
async fn block_or_unblock_gc(
request: Request<Body>,
block: bool,
) -> Result<Response<Body>, ApiError> {
use crate::tenant::{
remote_timeline_client::WaitCompletionError, upload_queue::NotInitialized,
};
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let state = get_state(&request);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
let timeline = tenant.get_timeline(timeline_id, true)?;
let fut = async {
if block {
timeline.block_gc(&tenant).await.map(|_| ())
} else {
timeline.unblock_gc(&tenant).await
}
};
let span = tracing::info_span!(
"block_or_unblock_gc",
tenant_id = %tenant_shard_id.tenant_id,
shard_id = %tenant_shard_id.shard_slug(),
timeline_id = %timeline_id,
block = block,
);
let res = fut.instrument(span).await;
res.map_err(|e| {
if e.is::<NotInitialized>() || e.is::<WaitCompletionError>() {
ApiError::ShuttingDown
} else {
ApiError::InternalServerError(e)
}
})?;
json_response(StatusCode::OK, ())
}
/// Get tenant_size SVG graph along with the JSON data.
fn synthetic_size_html_response(
inputs: ModelInputs,
@@ -2904,6 +2972,14 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
|r| api_handler(r, evict_timeline_layer_handler),
)
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/block_gc",
|r| api_handler(r, timeline_gc_blocking_handler),
)
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/unblock_gc",
|r| api_handler(r, timeline_gc_unblocking_handler),
)
.post("/v1/tenant/:tenant_shard_id/heatmap_upload", |r| {
api_handler(r, secondary_upload_handler)
})

View File

@@ -148,6 +148,7 @@ pub(crate) mod timeline;
pub mod size;
mod gc_block;
pub(crate) mod throttle;
pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -303,6 +304,12 @@ pub struct Tenant {
/// An ongoing timeline detach must be checked during attempts to GC or compact a timeline.
ongoing_timeline_detach: std::sync::Mutex<Option<(TimelineId, utils::completion::Barrier)>>,
/// `index_part.json` based gc blocking reason tracking.
///
/// New gc iterations must start a new iteration by acquiring `GcBlock::start` before
/// proceeding.
pub(crate) gc_block: gc_block::GcBlock,
l0_flush_global_state: L0FlushGlobalState,
}
@@ -1036,6 +1043,8 @@ impl Tenant {
}
}
let mut gc_blocks = HashMap::new();
// For every timeline, download the metadata file, scan the local directory,
// and build a layer map that contains an entry for each remote and local
// layer file.
@@ -1045,6 +1054,16 @@ impl Tenant {
.remove(&timeline_id)
.expect("just put it in above");
if let Some(blocking) = index_part.gc_blocking.as_ref() {
// could just filter these away, but it helps while testing
anyhow::ensure!(
!blocking.reasons.is_empty(),
"index_part for {timeline_id} is malformed: it should not have gc blocking with zero reasons"
);
let prev = gc_blocks.insert(timeline_id, blocking.reasons);
assert!(prev.is_none());
}
// TODO again handle early failure
self.load_remote_timeline(
timeline_id,
@@ -1089,6 +1108,8 @@ impl Tenant {
// IndexPart is the source of truth.
self.clean_up_timelines(&existent_timelines)?;
self.gc_block.set_scanned(gc_blocks);
fail::fail_point!("attach-before-activate", |_| {
anyhow::bail!("attach-before-activate");
});
@@ -1679,6 +1700,14 @@ impl Tenant {
}
}
let _guard = match self.gc_block.start().await {
Ok(guard) => guard,
Err(reasons) => {
info!("Skipping GC: {reasons}");
return Ok(GcResult::default());
}
};
self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx)
.await
}
@@ -2691,6 +2720,7 @@ impl Tenant {
)),
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
ongoing_timeline_detach: std::sync::Mutex::default(),
gc_block: Default::default(),
l0_flush_global_state,
}
}
@@ -4092,7 +4122,7 @@ pub(crate) mod harness {
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use super::*;
use crate::keyspace::KeySpaceAccum;
@@ -4767,7 +4797,7 @@ mod tests {
lsn: Lsn,
repeat: usize,
key_count: usize,
) -> anyhow::Result<()> {
) -> anyhow::Result<HashMap<Key, BTreeSet<Lsn>>> {
let compact = true;
bulk_insert_maybe_compact_gc(tenant, timeline, ctx, lsn, repeat, key_count, compact).await
}
@@ -4780,7 +4810,9 @@ mod tests {
repeat: usize,
key_count: usize,
compact: bool,
) -> anyhow::Result<()> {
) -> anyhow::Result<HashMap<Key, BTreeSet<Lsn>>> {
let mut inserted: HashMap<Key, BTreeSet<Lsn>> = Default::default();
let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let mut blknum = 0;
@@ -4801,6 +4833,7 @@ mod tests {
ctx,
)
.await?;
inserted.entry(test_key).or_default().insert(lsn);
writer.finish_write(lsn);
drop(writer);
@@ -4825,7 +4858,7 @@ mod tests {
assert_eq!(res.layers_removed, 0, "this never removes anything");
}
Ok(())
Ok(inserted)
}
//
@@ -4872,7 +4905,7 @@ mod tests {
.await?;
let lsn = Lsn(0x10);
bulk_insert_compact_gc(&tenant, &tline, &ctx, lsn, 50, 10000).await?;
let inserted = bulk_insert_compact_gc(&tenant, &tline, &ctx, lsn, 50, 10000).await?;
let guard = tline.layers.read().await;
guard.layer_map().dump(true, &ctx).await?;
@@ -4933,9 +4966,39 @@ mod tests {
&ctx,
)
.await;
tline
.validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx)
.await;
let mut expected_lsns: HashMap<Key, Lsn> = Default::default();
let mut expect_missing = false;
let mut key = read.start().unwrap();
while key != read.end().unwrap() {
if let Some(lsns) = inserted.get(&key) {
let expected_lsn = lsns.iter().rfind(|lsn| **lsn <= reads_lsn);
match expected_lsn {
Some(lsn) => {
expected_lsns.insert(key, *lsn);
}
None => {
expect_missing = true;
break;
}
}
} else {
expect_missing = true;
break;
}
key = key.next();
}
if expect_missing {
assert!(matches!(vectored_res, Err(GetVectoredError::MissingKey(_))));
} else {
for (key, image) in vectored_res? {
let expected_lsn = expected_lsns.get(&key).expect("determined above");
let expected_image = test_img(&format!("{} at {}", key.field6, expected_lsn));
assert_eq!(image?, expected_image);
}
}
}
Ok(())
@@ -4985,10 +5048,6 @@ mod tests {
)
.await;
child_timeline
.validate_get_vectored_impl(&vectored_res, aux_keyspace, read_lsn, &ctx)
.await;
let images = vectored_res?;
assert!(images.is_empty());
Ok(())
@@ -6899,7 +6958,10 @@ mod tests {
}
let cancel = CancellationToken::new();
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
for (idx, expected) in expected_result.iter().enumerate() {
assert_eq!(
@@ -6993,7 +7055,10 @@ mod tests {
guard.cutoffs.time = Lsn(0x40);
guard.cutoffs.space = Lsn(0x40);
}
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
Ok(())
}
@@ -7327,7 +7392,10 @@ mod tests {
}
let cancel = CancellationToken::new();
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
for idx in 0..10 {
assert_eq!(
@@ -7353,7 +7421,10 @@ mod tests {
guard.cutoffs.time = Lsn(0x40);
guard.cutoffs.space = Lsn(0x40);
}
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
Ok(())
}
@@ -7898,11 +7969,28 @@ mod tests {
verify_result().await;
let cancel = CancellationToken::new();
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
let mut dryrun_flags = EnumSet::new();
dryrun_flags.insert(CompactFlags::DryRun);
tline
.compact_with_gc(&cancel, dryrun_flags, &ctx)
.await
.unwrap();
// We expect layer map to be the same b/c the dry run flag, but we don't know whether there will be other background jobs
// cleaning things up, and therefore, we don't do sanity checks on the layer map during unit tests.
verify_result().await;
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
verify_result().await;
// compact again
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
verify_result().await;
// increase GC horizon and compact again
@@ -7912,11 +8000,17 @@ mod tests {
guard.cutoffs.time = Lsn(0x38);
guard.cutoffs.space = Lsn(0x38);
}
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
verify_result().await; // no wals between 0x30 and 0x38, so we should obtain the same result
// not increasing the GC horizon and compact again
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
verify_result().await;
Ok(())
@@ -8097,7 +8191,10 @@ mod tests {
verify_result().await;
let cancel = CancellationToken::new();
branch_tline.compact_with_gc(&cancel, &ctx).await.unwrap();
branch_tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.await
.unwrap();
verify_result().await;

View File

@@ -0,0 +1,213 @@
use std::collections::HashMap;
use utils::id::TimelineId;
use super::remote_timeline_client::index::GcBlockingReason;
type Storage = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
#[derive(Default)]
pub(crate) struct GcBlock {
/// The timelines which have current reasons to block gc.
///
/// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done
/// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`.
reasons: std::sync::Mutex<Storage>,
blocking: tokio::sync::Mutex<()>,
}
impl GcBlock {
/// Start another gc iteration.
///
/// Returns a guard to be held for the duration of gc iteration to allow synchronizing with
/// it's ending, or if not currently possible, a value describing the reasons why not.
///
/// Cancellation safe.
pub(super) async fn start(&self) -> Result<Guard<'_>, BlockingReasons> {
let reasons = {
let g = self.reasons.lock().unwrap();
// TODO: the assumption is that this method gets called periodically. in prod, we use 1h, in
// tests, we use everything. we should warn if the gc has been consecutively blocked
// for more than 1h (within single tenant session?).
BlockingReasons::clean_and_summarize(g)
};
if let Some(reasons) = reasons {
Err(reasons)
} else {
Ok(Guard {
_inner: self.blocking.lock().await,
})
}
}
pub(crate) fn summary(&self) -> Option<BlockingReasons> {
let g = self.reasons.lock().unwrap();
BlockingReasons::summarize(&g)
}
/// Start blocking gc for this one timeline for the given reason.
///
/// This is not a guard based API but instead it mimics set API. The returned future will not
/// resolve until an existing gc round has completed.
///
/// Returns true if this block was new, false if gc was already blocked for this reason.
///
/// Cancellation safe: cancelling after first poll will keep the reason to block gc, but will
/// keep the gc blocking reason.
pub(crate) async fn insert(
&self,
timeline: &super::Timeline,
reason: GcBlockingReason,
) -> anyhow::Result<bool> {
let (added, uploaded) = {
let mut g = self.reasons.lock().unwrap();
let set = g.entry(timeline.timeline_id).or_default();
let added = set.insert(reason);
// LOCK ORDER: intentionally hold the lock, see self.reasons.
let uploaded = timeline
.remote_client
.schedule_insert_gc_block_reason(reason)?;
(added, uploaded)
};
uploaded.await?;
// ensure that any ongoing gc iteration has completed
drop(self.blocking.lock().await);
Ok(added)
}
/// Remove blocking gc for this one timeline and the given reason.
pub(crate) async fn remove(
&self,
timeline: &super::Timeline,
reason: GcBlockingReason,
) -> anyhow::Result<()> {
use std::collections::hash_map::Entry;
super::span::debug_assert_current_span_has_tenant_and_timeline_id();
let (remaining_blocks, uploaded) = {
let mut g = self.reasons.lock().unwrap();
match g.entry(timeline.timeline_id) {
Entry::Occupied(mut oe) => {
let set = oe.get_mut();
set.remove(reason);
if set.is_empty() {
oe.remove();
}
}
Entry::Vacant(_) => {
// we must still do the index_part.json update regardless, in case we had earlier
// been cancelled
}
}
let remaining_blocks = g.len();
// LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons
let uploaded = timeline
.remote_client
.schedule_remove_gc_block_reason(reason)?;
(remaining_blocks, uploaded)
};
uploaded.await?;
// no need to synchronize with gc iteration again
if remaining_blocks > 0 {
tracing::info!(remaining_blocks, removed=?reason, "gc blocking removed, but gc remains blocked");
} else {
tracing::info!("gc is now unblocked for the tenant");
}
Ok(())
}
pub(crate) fn before_delete(&self, timeline: &super::Timeline) {
let unblocked = {
let mut g = self.reasons.lock().unwrap();
if g.is_empty() {
return;
}
g.remove(&timeline.timeline_id);
BlockingReasons::clean_and_summarize(g).is_none()
};
if unblocked {
tracing::info!("gc is now unblocked following deletion");
}
}
/// Initialize with the non-deleted timelines of this tenant.
pub(crate) fn set_scanned(&self, scanned: Storage) {
let mut g = self.reasons.lock().unwrap();
assert!(g.is_empty());
g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
if let Some(reasons) = BlockingReasons::clean_and_summarize(g) {
tracing::info!(summary=?reasons, "initialized with gc blocked");
}
}
}
pub(super) struct Guard<'a> {
_inner: tokio::sync::MutexGuard<'a, ()>,
}
#[derive(Debug)]
pub(crate) struct BlockingReasons {
timelines: usize,
reasons: enumset::EnumSet<GcBlockingReason>,
}
impl std::fmt::Display for BlockingReasons {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} timelines block for {:?}",
self.timelines, self.reasons
)
}
}
impl BlockingReasons {
fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
let mut reasons = enumset::EnumSet::empty();
g.retain(|_key, value| {
reasons = reasons.union(*value);
!value.is_empty()
});
if !g.is_empty() {
Some(BlockingReasons {
timelines: g.len(),
reasons,
})
} else {
None
}
}
fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
if g.is_empty() {
None
} else {
let reasons = g
.values()
.fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next));
Some(BlockingReasons {
timelines: g.len(),
reasons,
})
}
}
}

View File

@@ -800,6 +800,123 @@ impl RemoteTimelineClient {
.context("wait completion")
}
/// Adds a gc blocking reason for this timeline if one does not exist already.
///
/// A retryable step of timeline detach ancestor.
///
/// Returns a future which waits until the completion of the upload.
pub(crate) fn schedule_insert_gc_block_reason(
self: &Arc<Self>,
reason: index::GcBlockingReason,
) -> Result<impl std::future::Future<Output = Result<(), WaitCompletionError>>, NotInitialized>
{
let maybe_barrier = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
if let index::GcBlockingReason::DetachAncestor = reason {
if upload_queue.dirty.metadata.ancestor_timeline().is_none() {
drop(guard);
panic!("cannot start detach ancestor if there is nothing to detach from");
}
}
let wanted = |x: Option<&index::GcBlocking>| x.is_some_and(|x| x.blocked_by(reason));
let current = upload_queue.dirty.gc_blocking.as_ref();
let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
match (current, uploaded) {
(x, y) if wanted(x) && wanted(y) => None,
(x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
// Usual case: !wanted(x) && !wanted(y)
//
// Unusual: !wanted(x) && wanted(y) which means we have two processes waiting to
// turn on and off some reason.
(x, y) => {
if !wanted(x) && wanted(y) {
// this could be avoided by having external in-memory synchronization, like
// timeline detach ancestor
warn!(?reason, op="insert", "unexpected: two racing processes to enable and disable a gc blocking reason");
}
// at this point, the metadata must always show that there is a parent
upload_queue.dirty.gc_blocking = current
.map(|x| x.with_reason(reason))
.or_else(|| Some(index::GcBlocking::started_now_for(reason)));
self.schedule_index_upload(upload_queue)?;
Some(self.schedule_barrier0(upload_queue))
}
}
};
Ok(async move {
if let Some(barrier) = maybe_barrier {
Self::wait_completion0(barrier).await?;
}
Ok(())
})
}
/// Removes a gc blocking reason for this timeline if one exists.
///
/// A retryable step of timeline detach ancestor.
///
/// Returns a future which waits until the completion of the upload.
pub(crate) fn schedule_remove_gc_block_reason(
self: &Arc<Self>,
reason: index::GcBlockingReason,
) -> Result<impl std::future::Future<Output = Result<(), WaitCompletionError>>, NotInitialized>
{
let maybe_barrier = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
if let index::GcBlockingReason::DetachAncestor = reason {
if !upload_queue
.clean
.0
.lineage
.is_detached_from_original_ancestor()
{
drop(guard);
panic!("cannot complete timeline_ancestor_detach while not detached");
}
}
let wanted = |x: Option<&index::GcBlocking>| {
x.is_none() || x.is_some_and(|b| !b.blocked_by(reason))
};
let current = upload_queue.dirty.gc_blocking.as_ref();
let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
match (current, uploaded) {
(x, y) if wanted(x) && wanted(y) => None,
(x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
(x, y) => {
if !wanted(x) && wanted(y) {
warn!(?reason, op="remove", "unexpected: two racing processes to enable and disable a gc blocking reason (remove)");
}
upload_queue.dirty.gc_blocking =
current.as_ref().and_then(|x| x.without_reason(reason));
assert!(wanted(upload_queue.dirty.gc_blocking.as_ref()));
// FIXME: bogus ?
self.schedule_index_upload(upload_queue)?;
Some(self.schedule_barrier0(upload_queue))
}
}
};
Ok(async move {
if let Some(barrier) = maybe_barrier {
Self::wait_completion0(barrier).await?;
}
Ok(())
})
}
/// Launch an upload operation in the background; the file is added to be included in next
/// `index_part.json` upload.
pub(crate) fn schedule_layer_file_upload(

View File

@@ -60,6 +60,9 @@ pub struct IndexPart {
#[serde(default)]
pub(crate) lineage: Lineage,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) gc_blocking: Option<GcBlocking>,
/// Describes the kind of aux files stored in the timeline.
///
/// The value is modified during file ingestion when the latest wanted value communicated via tenant config is applied if it is acceptable.
@@ -85,10 +88,11 @@ impl IndexPart {
/// - 6: last_aux_file_policy is added.
/// - 7: metadata_bytes is no longer written, but still read
/// - 8: added `archived_at`
const LATEST_VERSION: usize = 8;
/// - 9: +gc_blocking
const LATEST_VERSION: usize = 9;
// Versions we may see when reading from a bucket.
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8];
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9];
pub const FILE_NAME: &'static str = "index_part.json";
@@ -101,6 +105,7 @@ impl IndexPart {
deleted_at: None,
archived_at: None,
lineage: Default::default(),
gc_blocking: None,
last_aux_file_policy: None,
}
}
@@ -251,6 +256,64 @@ impl Lineage {
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct GcBlocking {
pub(crate) started_at: NaiveDateTime,
pub(crate) reasons: enumset::EnumSet<GcBlockingReason>,
}
#[derive(Debug, enumset::EnumSetType, serde::Serialize, serde::Deserialize)]
#[enumset(serialize_repr = "list")]
pub(crate) enum GcBlockingReason {
Manual,
DetachAncestor,
}
impl GcBlocking {
pub(super) fn started_now_for(reason: GcBlockingReason) -> Self {
GcBlocking {
started_at: chrono::Utc::now().naive_utc(),
reasons: enumset::EnumSet::only(reason),
}
}
/// Returns true if the given reason is one of the reasons why the gc is blocked.
pub(crate) fn blocked_by(&self, reason: GcBlockingReason) -> bool {
self.reasons.contains(reason)
}
/// Returns a version of self with the given reason.
pub(super) fn with_reason(&self, reason: GcBlockingReason) -> Self {
assert!(!self.blocked_by(reason));
let mut reasons = self.reasons;
reasons.insert(reason);
Self {
started_at: self.started_at,
reasons,
}
}
/// Returns a version of self without the given reason. Assumption is that if
/// there are no more reasons, we can unblock the gc by returning `None`.
pub(super) fn without_reason(&self, reason: GcBlockingReason) -> Option<Self> {
assert!(self.blocked_by(reason));
if self.reasons.len() == 1 {
None
} else {
let mut reasons = self.reasons;
assert!(reasons.remove(reason));
assert!(!reasons.is_empty());
Some(Self {
started_at: self.started_at,
reasons,
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -292,6 +355,7 @@ mod tests {
deleted_at: None,
archived_at: None,
lineage: Lineage::default(),
gc_blocking: None,
last_aux_file_policy: None,
};
@@ -335,6 +399,7 @@ mod tests {
deleted_at: None,
archived_at: None,
lineage: Lineage::default(),
gc_blocking: None,
last_aux_file_policy: None,
};
@@ -379,6 +444,7 @@ mod tests {
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
archived_at: None,
lineage: Lineage::default(),
gc_blocking: None,
last_aux_file_policy: None,
};
@@ -426,6 +492,7 @@ mod tests {
deleted_at: None,
archived_at: None,
lineage: Lineage::default(),
gc_blocking: None,
last_aux_file_policy: None,
};
@@ -468,6 +535,7 @@ mod tests {
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
archived_at: None,
lineage: Lineage::default(),
gc_blocking: None,
last_aux_file_policy: None,
};
@@ -513,6 +581,7 @@ mod tests {
reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()],
original_ancestor: Some((TimelineId::from_str("e2bfd8c633d713d279e6fcd2bcc15b6d").unwrap(), Lsn::from_str("0/15A7618").unwrap(), parse_naive_datetime("2024-05-07T18:52:36.322426563"))),
},
gc_blocking: None,
last_aux_file_policy: None,
};
@@ -563,6 +632,7 @@ mod tests {
reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()],
original_ancestor: Some((TimelineId::from_str("e2bfd8c633d713d279e6fcd2bcc15b6d").unwrap(), Lsn::from_str("0/15A7618").unwrap(), parse_naive_datetime("2024-05-07T18:52:36.322426563"))),
},
gc_blocking: None,
last_aux_file_policy: Some(AuxFilePolicy::V2),
};
@@ -618,6 +688,7 @@ mod tests {
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
archived_at: None,
lineage: Default::default(),
gc_blocking: None,
last_aux_file_policy: Default::default(),
};
@@ -674,6 +745,7 @@ mod tests {
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
archived_at: Some(parse_naive_datetime("2023-04-29T09:00:00.123000000")),
lineage: Default::default(),
gc_blocking: None,
last_aux_file_policy: Default::default(),
};
@@ -681,6 +753,68 @@ mod tests {
assert_eq!(part, expected);
}
#[test]
fn v9_indexpart_is_parsed() {
let example = r#"{
"version": 9,
"layer_metadata":{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
},
"disk_consistent_lsn":"0/16960E8",
"metadata": {
"disk_consistent_lsn": "0/16960E8",
"prev_record_lsn": "0/1696070",
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
"ancestor_lsn": "0/0",
"latest_gc_cutoff_lsn": "0/1696070",
"initdb_lsn": "0/1696070",
"pg_version": 14
},
"gc_blocking": {
"started_at": "2024-07-19T09:00:00.123",
"reasons": ["DetachAncestor"]
}
}"#;
let expected = IndexPart {
version: 9,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata: TimelineMetadata::new(
Lsn::from_str("0/16960E8").unwrap(),
Some(Lsn::from_str("0/1696070").unwrap()),
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
14,
).with_recalculated_checksum().unwrap(),
deleted_at: None,
lineage: Default::default(),
gc_blocking: Some(GcBlocking {
started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]),
}),
last_aux_file_policy: Default::default(),
archived_at: None,
};
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
fn parse_naive_datetime(s: &str) -> NaiveDateTime {
chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S.%f").unwrap()
}

View File

@@ -435,21 +435,6 @@ impl ReadableLayer {
}
}
/// Return value from [`Layer::get_value_reconstruct_data`]
#[derive(Clone, Copy, Debug)]
pub enum ValueReconstructResult {
/// Got all the data needed to reconstruct the requested page
Complete,
/// This layer didn't contain all the required data, the caller should look up
/// the predecessor layer at the returned LSN and collect more data from there.
Continue,
/// This layer didn't contain data needed to reconstruct the page version at
/// the returned LSN. This is usually considered an error, but might be OK
/// in some circumstances.
Missing,
}
/// Layers contain a hint indicating whether they are likely to be used for reads. This is a hint rather
/// than an authoritative value, so that we do not have to update it synchronously when changing the visibility
/// of layers (for example when creating a branch that makes some previously covered layers visible). It should

View File

@@ -36,7 +36,7 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi
use crate::tenant::disk_btree::{
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::storage_layer::Layer;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
@@ -826,95 +826,6 @@ impl DeltaLayerInner {
})
}
pub(super) async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
let mut need_image = true;
// Scan the page versions backwards, starting from `lsn`.
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
&block_reader,
);
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
tree_reader
.visit(
&search_key.0,
VisitDirection::Backwards,
|key, value| {
let blob_ref = BlobRef(value);
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
},
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build(),
)
.await?;
let ctx = &RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerValue)
.build();
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let cursor = block_reader.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor
.read_blob_into_buf(pos, &mut buf, ctx)
.await
.with_context(|| {
format!("Failed to read blob from virtual file {}", self.file.path)
})?;
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
self.file.path
)
})?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
} else {
Ok(ValueReconstructResult::Complete)
}
}
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
//

View File

@@ -32,9 +32,7 @@ use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
};
use crate::tenant::storage_layer::{
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::storage_layer::LayerAccessStats;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
@@ -429,46 +427,6 @@ impl ImageLayerInner {
})
}
pub(super) async fn get_value_reconstruct_data(
&self,
key: Key,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
if let Some(offset) = tree_reader
.get(
&keybuf,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
.build(),
)
.await?
{
let blob = block_reader
.block_cursor()
.read_blob(
offset,
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerValue)
.build(),
)
.await
.with_context(|| format!("failed to read value from offset {}", offset))?;
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
} else {
Ok(ValueReconstructResult::Missing)
}
}
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
pub(super) async fn get_values_reconstruct_data(
@@ -753,6 +711,10 @@ struct ImageLayerWriterInner {
}
impl ImageLayerWriterInner {
fn size(&self) -> u64 {
self.tree.borrow_writer().size() + self.blob_writer.size()
}
///
/// Start building a new image layer.
///
@@ -1044,6 +1006,10 @@ impl ImageLayerWriter {
.finish(timeline, ctx, Some(end_key))
.await
}
pub(crate) fn size(&self) -> u64 {
self.inner.as_ref().unwrap().size()
}
}
impl Drop for ImageLayerWriter {

View File

@@ -10,11 +10,10 @@ use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, Value};
use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::ValueReconstructResult;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::{l0_flush, page_cache, walrecord};
use anyhow::{anyhow, ensure, Result};
use anyhow::{anyhow, Result};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
@@ -33,10 +32,7 @@ use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::atomic::{AtomicU64, AtomicUsize};
use tokio::sync::{RwLock, RwLockWriteGuard};
use super::{
DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
ValuesReconstructState,
};
use super::{DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValuesReconstructState};
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
@@ -55,9 +51,6 @@ pub struct InMemoryLayer {
/// Writes are only allowed when this is `None`.
pub(crate) end_lsn: OnceLock<Lsn>,
/// Used for traversal path. Cached representation of the in-memory layer before frozen.
local_path_str: Arc<str>,
/// Used for traversal path. Cached representation of the in-memory layer after frozen.
frozen_local_path_str: OnceLock<Arc<str>>,
@@ -248,12 +241,6 @@ impl InMemoryLayer {
self.start_lsn..self.end_lsn_or_max()
}
pub(crate) fn local_path_str(&self) -> &Arc<str> {
self.frozen_local_path_str
.get()
.unwrap_or(&self.local_path_str)
}
/// debugging function to print out the contents of the layer
///
/// this is likely completly unused
@@ -303,60 +290,6 @@ impl InMemoryLayer {
Ok(())
}
/// Look up given value in the layer.
pub(crate) async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
let inner = self.inner.read().await;
let reader = inner.file.block_cursor();
// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
let buf = reader.read_blob(*pos, &ctx).await?;
let value = Value::des(&buf)?;
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok(ValueReconstructResult::Complete);
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
}
// release lock on 'inner'
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
} else {
Ok(ValueReconstructResult::Complete)
}
}
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
//
@@ -458,11 +391,6 @@ impl InMemoryLayer {
Ok(InMemoryLayer {
file_id: key,
local_path_str: {
let mut buf = String::new();
inmem_layer_log_display(&mut buf, timeline_id, start_lsn, Lsn::MAX).unwrap();
buf.into()
},
frozen_local_path_str: OnceLock::new(),
conf,
timeline_id,

View File

@@ -24,8 +24,7 @@ use super::delta_layer::{self, DeltaEntry};
use super::image_layer::{self};
use super::{
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
LayerVisibilityHint, PersistentLayerDesc, ValueReconstructResult, ValueReconstructState,
ValuesReconstructState,
LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState,
};
use utils::generation::Generation;
@@ -301,42 +300,6 @@ impl Layer {
self.0.delete_on_drop();
}
/// Return data needed to reconstruct given page at LSN.
///
/// It is up to the caller to collect more data from the previous layer and
/// perform WAL redo, if necessary.
///
/// # Cancellation-Safety
///
/// This method is cancellation-safe.
pub(crate) async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
use anyhow::ensure;
let layer = self.0.get_or_maybe_download(true, Some(ctx)).await?;
self.0.access_stats.record_access(ctx);
if self.layer_desc().is_delta {
ensure!(lsn_range.start >= self.layer_desc().lsn_range.start);
ensure!(self.layer_desc().key_range.contains(&key));
} else {
ensure!(self.layer_desc().key_range.contains(&key));
ensure!(lsn_range.start >= self.layer_desc().image_layer_lsn());
ensure!(lsn_range.end >= self.layer_desc().image_layer_lsn());
}
layer
.get_value_reconstruct_data(key, lsn_range, reconstruct_data, &self.0, ctx)
.instrument(tracing::debug_span!("get_value_reconstruct_data", layer=%self))
.await
.with_context(|| format!("get_value_reconstruct_data for layer {self}"))
}
pub(crate) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
@@ -441,10 +404,6 @@ impl Layer {
&self.0.path
}
pub(crate) fn debug_str(&self) -> &Arc<str> {
&self.0.debug_str
}
pub(crate) fn metadata(&self) -> LayerFileMetadata {
self.0.metadata()
}
@@ -519,7 +478,7 @@ impl Layer {
///
/// However when we want something evicted, we cannot evict it right away as there might be current
/// reads happening on it. For example: it has been searched from [`LayerMap::search`] but not yet
/// read with [`Layer::get_value_reconstruct_data`].
/// read with [`Layer::get_values_reconstruct_data`].
///
/// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
#[derive(Debug)]
@@ -600,9 +559,6 @@ struct LayerInner {
/// Full path to the file; unclear if this should exist anymore.
path: Utf8PathBuf,
/// String representation of the layer, used for traversal id.
debug_str: Arc<str>,
desc: PersistentLayerDesc,
/// Timeline access is needed for remote timeline client and metrics.
@@ -836,9 +792,6 @@ impl LayerInner {
LayerInner {
conf,
debug_str: {
format!("timelines/{}/{}", timeline.timeline_id, desc.layer_name()).into()
},
path: local_path,
desc,
timeline: Arc::downgrade(timeline),
@@ -1759,28 +1712,6 @@ impl DownloadedLayer {
.map_err(|e| anyhow::anyhow!("layer load failed earlier: {e}"))
}
async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
owner: &Arc<LayerInner>,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
use LayerKind::*;
match self.get(owner, ctx).await? {
Delta(d) => {
d.get_value_reconstruct_data(key, lsn_range, reconstruct_data, ctx)
.await
}
Image(i) => {
i.get_value_reconstruct_data(key, reconstruct_data, ctx)
.await
}
}
}
async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,

View File

@@ -50,13 +50,26 @@ async fn smoke_test() {
// all layers created at pageserver are like `layer`, initialized with strong
// Arc<DownloadedLayer>.
let controlfile_keyspace = KeySpace {
ranges: vec![CONTROLFILE_KEY..CONTROLFILE_KEY.next()],
};
let img_before = {
let mut data = ValueReconstructState::default();
let mut data = ValuesReconstructState::default();
layer
.get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
.get_values_reconstruct_data(
controlfile_keyspace.clone(),
Lsn(0x10)..Lsn(0x11),
&mut data,
&ctx,
)
.await
.unwrap();
data.img
data.keys
.remove(&CONTROLFILE_KEY)
.expect("must be present")
.expect("should not error")
.img
.take()
.expect("tenant harness writes the control file")
};
@@ -74,13 +87,24 @@ async fn smoke_test() {
// on accesses when the layer is evicted, it will automatically be downloaded.
let img_after = {
let mut data = ValueReconstructState::default();
let mut data = ValuesReconstructState::default();
layer
.get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
.get_values_reconstruct_data(
controlfile_keyspace.clone(),
Lsn(0x10)..Lsn(0x11),
&mut data,
&ctx,
)
.instrument(download_span.clone())
.await
.unwrap();
data.img.take().unwrap()
data.keys
.remove(&CONTROLFILE_KEY)
.expect("must be present")
.expect("should not error")
.img
.take()
.expect("tenant harness writes the control file")
};
assert_eq!(img_before, img_after);
@@ -830,7 +854,7 @@ async fn eviction_cancellation_on_drop() {
fn layer_size() {
assert_eq!(size_of::<LayerAccessStats>(), 8);
assert_eq!(size_of::<PersistentLayerDesc>(), 104);
assert_eq!(size_of::<LayerInner>(), 312);
assert_eq!(size_of::<LayerInner>(), 296);
// it also has the utf8 path
}

View File

@@ -22,8 +22,8 @@ use handle::ShardTimelineId;
use once_cell::sync::Lazy;
use pageserver_api::{
key::{
AUX_FILES_KEY, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX,
NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE,
KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE,
NON_INHERITED_SPARSE_RANGE,
},
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
models::{
@@ -59,10 +59,7 @@ use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::atomic::AtomicU64,
};
use std::{
cmp::{max, min},
ops::ControlFlow,
};
use std::{cmp::min, ops::ControlFlow};
use std::{
collections::btree_map::Entry,
ops::{Deref, Range},
@@ -87,8 +84,8 @@ use crate::{
disk_usage_eviction_task::finite_f32,
tenant::storage_layer::{
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
LayerAccessStatsReset, LayerName, ResidentLayer, ValueReconstructResult,
ValueReconstructState, ValuesReconstructState,
LayerAccessStatsReset, LayerName, ResidentLayer, ValueReconstructState,
ValuesReconstructState,
},
};
use crate::{
@@ -543,7 +540,6 @@ pub struct MissingKeyError {
cont_lsn: Lsn,
request_lsn: Lsn,
ancestor_lsn: Option<Lsn>,
traversal_path: Vec<TraversalPathItem>,
backtrace: Option<std::backtrace::Backtrace>,
}
@@ -564,18 +560,6 @@ impl std::fmt::Display for MissingKeyError {
write!(f, ", ancestor {}", ancestor_lsn)?;
}
if !self.traversal_path.is_empty() {
writeln!(f)?;
}
for (r, c, l) in &self.traversal_path {
writeln!(
f,
"layer traversal: result {:?}, cont_lsn {}, layer: {}",
r, c, l,
)?;
}
if let Some(ref backtrace) = self.backtrace {
write!(f, "\n{}", backtrace)?;
}
@@ -704,6 +688,7 @@ pub(crate) enum CompactFlags {
ForceRepartition,
ForceImageLayerCreation,
EnhancedGcBottomMostCompaction,
DryRun,
}
impl std::fmt::Debug for Timeline {
@@ -917,119 +902,44 @@ impl Timeline {
self.timeline_get_throttle.throttle(ctx, 1).await;
match self.conf.get_impl {
GetImpl::Legacy => {
let reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: None,
};
let keyspace = KeySpace {
ranges: vec![key..key.next()],
};
self.get_impl(key, lsn, reconstruct_state, ctx).await
}
GetImpl::Vectored => {
let keyspace = KeySpace {
ranges: vec![key..key.next()],
};
// Initialise the reconstruct state for the key with the cache
// entry returned above.
let mut reconstruct_state = ValuesReconstructState::new();
// Initialise the reconstruct state for the key with the cache
// entry returned above.
let mut reconstruct_state = ValuesReconstructState::new();
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.await;
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
.await;
if self.conf.validate_vectored_get {
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
.await;
}
let key_value = vectored_res?.pop_first();
match key_value {
Some((got_key, value)) => {
if got_key != key {
error!(
"Expected {}, but singular vectored get returned {}",
key, got_key
);
Err(PageReconstructError::Other(anyhow!(
"Singular vectored get returned wrong key"
)))
} else {
value
}
}
None => Err(PageReconstructError::MissingKey(MissingKeyError {
key,
shard: self.shard_identity.get_shard_number(&key),
cont_lsn: Lsn(0),
request_lsn: lsn,
ancestor_lsn: None,
traversal_path: Vec::new(),
backtrace: None,
})),
let key_value = vectored_res?.pop_first();
match key_value {
Some((got_key, value)) => {
if got_key != key {
error!(
"Expected {}, but singular vectored get returned {}",
key, got_key
);
Err(PageReconstructError::Other(anyhow!(
"Singular vectored get returned wrong key"
)))
} else {
value
}
}
None => Err(PageReconstructError::MissingKey(MissingKeyError {
key,
shard: self.shard_identity.get_shard_number(&key),
cont_lsn: Lsn(0),
request_lsn: lsn,
ancestor_lsn: None,
backtrace: None,
})),
}
}
/// Not subject to [`Self::timeline_get_throttle`].
async fn get_impl(
&self,
key: Key,
lsn: Lsn,
mut reconstruct_state: ValueReconstructState,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
// XXX: structured stats collection for layer eviction here.
trace!(
"get page request for {}@{} from task kind {:?}",
key,
lsn,
ctx.task_kind()
);
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
.for_get_kind(GetKind::Singular)
.start_timer();
let path = self
.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
.await?;
timer.stop_and_record();
let start = Instant::now();
let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
let elapsed = start.elapsed();
crate::metrics::RECONSTRUCT_TIME
.for_get_kind(GetKind::Singular)
.observe(elapsed.as_secs_f64());
if cfg!(feature = "testing")
&& res.is_err()
&& !matches!(res, Err(PageReconstructError::Cancelled))
{
// it can only be walredo issue
use std::fmt::Write;
let mut msg = String::new();
path.into_iter().for_each(|(res, cont_lsn, layer)| {
writeln!(
msg,
"- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
layer,
)
.expect("string grows")
});
// this is to rule out or provide evidence that we could in some cases read a duplicate
// walrecord
tracing::info!("walredo failed, path:\n{msg}");
}
res
}
pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
pub(crate) const VEC_GET_LAYERS_VISITED_WARN_THRESH: f64 = 512.0;
@@ -1079,28 +989,14 @@ impl Timeline {
.throttle(ctx, key_count as usize)
.await;
let res = match self.conf.get_vectored_impl {
GetVectoredImpl::Sequential => {
self.get_vectored_sequential_impl(keyspace, lsn, ctx).await
}
GetVectoredImpl::Vectored => {
let vectored_res = self
.get_vectored_impl(
keyspace.clone(),
lsn,
&mut ValuesReconstructState::new(),
ctx,
)
.await;
if self.conf.validate_vectored_get {
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
.await;
}
vectored_res
}
};
let res = self
.get_vectored_impl(
keyspace.clone(),
lsn,
&mut ValuesReconstructState::new(),
ctx,
)
.await;
if let Some((metric, start)) = start {
let elapsed = start.elapsed();
@@ -1189,65 +1085,6 @@ impl Timeline {
vectored_res
}
/// Not subject to [`Self::timeline_get_throttle`].
pub(super) async fn get_vectored_sequential_impl(
&self,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
let mut values = BTreeMap::new();
for range in keyspace.ranges {
let mut key = range.start;
while key != range.end {
let block = self
.get_impl(key, lsn, ValueReconstructState::default(), ctx)
.await;
use PageReconstructError::*;
match block {
Err(Cancelled) => return Err(GetVectoredError::Cancelled),
Err(MissingKey(_))
if NON_INHERITED_RANGE.contains(&key)
|| NON_INHERITED_SPARSE_RANGE.contains(&key) =>
{
// Ignore missing key error for aux key range. TODO: currently, we assume non_inherited_range == aux_key_range.
// When we add more types of keys into the page server, we should revisit this part of code and throw errors
// accordingly.
key = key.next();
}
Err(MissingKey(err)) => {
return Err(GetVectoredError::MissingKey(err));
}
Err(Other(err))
if err
.to_string()
.contains("downloading evicted layer file failed") =>
{
return Err(GetVectoredError::Other(err))
}
Err(Other(err))
if err
.chain()
.any(|cause| cause.to_string().contains("layer loading failed")) =>
{
// The intent here is to achieve error parity with the vectored read path.
// When vectored read fails to load a layer it fails the whole read, hence
// we mimic this behaviour here to keep the validation happy.
return Err(GetVectoredError::Other(err));
}
_ => {
values.insert(key, block);
key = key.next();
}
}
}
}
Ok(values)
}
pub(super) async fn get_vectored_impl(
&self,
keyspace: KeySpace,
@@ -1318,113 +1155,6 @@ impl Timeline {
Ok(results)
}
/// Not subject to [`Self::timeline_get_throttle`].
pub(super) async fn validate_get_vectored_impl(
&self,
vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
keyspace: KeySpace,
lsn: Lsn,
ctx: &RequestContext,
) {
if keyspace.overlaps(&Key::metadata_key_range()) {
// skip validation for metadata key range
return;
}
let sequential_res = self
.get_vectored_sequential_impl(keyspace.clone(), lsn, ctx)
.await;
fn errors_match(lhs: &GetVectoredError, rhs: &GetVectoredError) -> bool {
use GetVectoredError::*;
match (lhs, rhs) {
(Oversized(l), Oversized(r)) => l == r,
(InvalidLsn(l), InvalidLsn(r)) => l == r,
(MissingKey(l), MissingKey(r)) => l.key == r.key,
(GetReadyAncestorError(_), GetReadyAncestorError(_)) => true,
(Other(_), Other(_)) => true,
_ => false,
}
}
match (&sequential_res, vectored_res) {
(Err(GetVectoredError::Cancelled), _) => {},
(_, Err(GetVectoredError::Cancelled)) => {},
(Err(seq_err), Ok(_)) => {
panic!(concat!("Sequential get failed with {}, but vectored get did not",
" - keyspace={:?} lsn={}"),
seq_err, keyspace, lsn) },
(Ok(_), Err(GetVectoredError::GetReadyAncestorError(GetReadyAncestorError::AncestorLsnTimeout(_)))) => {
// Sequential get runs after vectored get, so it is possible for the later
// to time out while waiting for its ancestor's Lsn to become ready and for the
// former to succeed (it essentially has a doubled wait time).
},
(Ok(_), Err(vec_err)) => {
panic!(concat!("Vectored get failed with {}, but sequential get did not",
" - keyspace={:?} lsn={}"),
vec_err, keyspace, lsn) },
(Err(seq_err), Err(vec_err)) => {
assert!(errors_match(seq_err, vec_err),
"Mismatched errors: {seq_err} != {vec_err} - keyspace={keyspace:?} lsn={lsn}")},
(Ok(seq_values), Ok(vec_values)) => {
seq_values.iter().zip(vec_values.iter()).for_each(|((seq_key, seq_res), (vec_key, vec_res))| {
assert_eq!(seq_key, vec_key);
match (seq_res, vec_res) {
(Ok(seq_blob), Ok(vec_blob)) => {
Self::validate_key_equivalence(seq_key, &keyspace, lsn, seq_blob, vec_blob);
},
(Err(err), Ok(_)) => {
panic!(
concat!("Sequential get failed with {} for key {}, but vectored get did not",
" - keyspace={:?} lsn={}"),
err, seq_key, keyspace, lsn) },
(Ok(_), Err(err)) => {
panic!(
concat!("Vectored get failed with {} for key {}, but sequential get did not",
" - keyspace={:?} lsn={}"),
err, seq_key, keyspace, lsn) },
(Err(_), Err(_)) => {}
}
})
}
}
}
fn validate_key_equivalence(
key: &Key,
keyspace: &KeySpace,
lsn: Lsn,
seq: &Bytes,
vec: &Bytes,
) {
if *key == AUX_FILES_KEY {
// The value reconstruct of AUX_FILES_KEY from records is not deterministic
// since it uses a hash map under the hood. Hence, deserialise both results
// before comparing.
let seq_aux_dir_res = AuxFilesDirectory::des(seq);
let vec_aux_dir_res = AuxFilesDirectory::des(vec);
match (&seq_aux_dir_res, &vec_aux_dir_res) {
(Ok(seq_aux_dir), Ok(vec_aux_dir)) => {
assert_eq!(
seq_aux_dir, vec_aux_dir,
"Mismatch for key {} - keyspace={:?} lsn={}",
key, keyspace, lsn
);
}
(Err(_), Err(_)) => {}
_ => {
panic!("Mismatch for {key}: {seq_aux_dir_res:?} != {vec_aux_dir_res:?}");
}
}
} else {
// All other keys should reconstruct deterministically, so we simply compare the blobs.
assert_eq!(
seq, vec,
"Image mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
);
}
}
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
pub(crate) fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load().last
@@ -3214,228 +2944,7 @@ impl Timeline {
}
}
type TraversalId = Arc<str>;
trait TraversalLayerExt {
fn traversal_id(&self) -> TraversalId;
}
impl TraversalLayerExt for Layer {
fn traversal_id(&self) -> TraversalId {
Arc::clone(self.debug_str())
}
}
impl TraversalLayerExt for Arc<InMemoryLayer> {
fn traversal_id(&self) -> TraversalId {
Arc::clone(self.local_path_str())
}
}
impl Timeline {
///
/// Get a handle to a Layer for reading.
///
/// The returned Layer might be from an ancestor timeline, if the
/// segment hasn't been updated on this timeline yet.
///
/// This function takes the current timeline's locked LayerMap as an argument,
/// so callers can avoid potential race conditions.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
async fn get_reconstruct_data(
&self,
key: Key,
request_lsn: Lsn,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
// Start from the current timeline.
let mut timeline_owned;
let mut timeline = self;
let mut read_count = scopeguard::guard(0, |cnt| {
crate::metrics::READ_NUM_LAYERS_VISITED.observe(cnt as f64)
});
// For debugging purposes, collect the path of layers that we traversed
// through. It's included in the error message if we fail to find the key.
let mut traversal_path = Vec::<TraversalPathItem>::new();
let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
*cached_lsn
} else {
Lsn(0)
};
// 'prev_lsn' tracks the last LSN that we were at in our search. It's used
// to check that each iteration make some progress, to break infinite
// looping if something goes wrong.
let mut prev_lsn = None;
let mut result = ValueReconstructResult::Continue;
let mut cont_lsn = Lsn(request_lsn.0 + 1);
'outer: loop {
if self.cancel.is_cancelled() {
return Err(PageReconstructError::Cancelled);
}
// The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
match result {
ValueReconstructResult::Complete => return Ok(traversal_path),
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
return Ok(traversal_path);
}
if let Some(prev) = prev_lsn {
if prev <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid
// getting stuck in the loop.
return Err(PageReconstructError::MissingKey(MissingKeyError {
key,
shard: self.shard_identity.get_shard_number(&key),
cont_lsn: Lsn(cont_lsn.0 - 1),
request_lsn,
ancestor_lsn: Some(timeline.ancestor_lsn),
traversal_path,
backtrace: None,
}));
}
}
prev_lsn = Some(cont_lsn);
}
ValueReconstructResult::Missing => {
return Err(PageReconstructError::MissingKey(MissingKeyError {
key,
shard: self.shard_identity.get_shard_number(&key),
cont_lsn,
request_lsn,
ancestor_lsn: None,
traversal_path,
backtrace: if cfg!(test) {
Some(std::backtrace::Backtrace::force_capture())
} else {
None
},
}));
}
}
// Recurse into ancestor if needed
if let Some(ancestor_timeline) = timeline.ancestor_timeline.as_ref() {
if key.is_inherited_key() && Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
trace!(
"going into ancestor {}, cont_lsn is {}",
timeline.ancestor_lsn,
cont_lsn
);
timeline_owned = timeline
.get_ready_ancestor_timeline(ancestor_timeline, ctx)
.await?;
timeline = &*timeline_owned;
prev_lsn = None;
continue 'outer;
}
}
let guard = timeline.layers.read().await;
let layers = guard.layer_map();
// Check the open and frozen in-memory layers first, in order from newest
// to oldest.
if let Some(open_layer) = &layers.open_layer {
let start_lsn = open_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.layer_name().display());
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn);
let open_layer = open_layer.clone();
drop(guard);
result = match open_layer
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
)
.await
{
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
*read_count += 1;
traversal_path.push((result, cont_lsn, open_layer.traversal_id()));
continue 'outer;
}
}
for frozen_layer in layers.frozen_layers.iter().rev() {
let start_lsn = frozen_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.layer_name().display());
let lsn_floor = max(cached_lsn + 1, start_lsn);
let frozen_layer = frozen_layer.clone();
drop(guard);
result = match frozen_layer
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
)
.await
{
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
*read_count += 1;
traversal_path.push((result, cont_lsn, frozen_layer.traversal_id()));
continue 'outer;
}
}
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
let layer = guard.get_from_desc(&layer);
drop(guard);
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = match layer
.get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state, ctx)
.await
{
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
*read_count += 1;
traversal_path.push((result, cont_lsn, layer.traversal_id()));
continue 'outer;
} else if timeline.ancestor_timeline.is_some() {
// Nothing on this timeline. Traverse to parent
result = ValueReconstructResult::Continue;
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
continue 'outer;
} else {
// Nothing found
result = ValueReconstructResult::Missing;
continue 'outer;
}
}
}
#[allow(clippy::doc_lazy_continuation)]
/// Get the data needed to reconstruct all keys in the provided keyspace
///
@@ -3529,7 +3038,6 @@ impl Timeline {
cont_lsn,
request_lsn,
ancestor_lsn: Some(timeline.ancestor_lsn),
traversal_path: vec![],
backtrace: None,
}));
}
@@ -4122,17 +3630,11 @@ impl Timeline {
/// Return true if the value changed
///
/// This function must only be used from the layer flush task, and may not be called concurrently.
/// This function must only be used from the layer flush task.
fn set_disk_consistent_lsn(&self, new_value: Lsn) -> bool {
// We do a simple load/store cycle: that's why this function isn't safe for concurrent use.
let old_value = self.disk_consistent_lsn.load();
if new_value != old_value {
assert!(new_value >= old_value);
self.disk_consistent_lsn.store(new_value);
true
} else {
false
}
let old_value = self.disk_consistent_lsn.fetch_max(new_value);
assert!(new_value >= old_value, "disk_consistent_lsn must be growing monotonously at runtime; current {old_value}, offered {new_value}");
new_value != old_value
}
/// Update metadata file
@@ -4727,6 +4229,12 @@ impl Timeline {
return;
}
if self.current_logical_size.current_size().is_exact() {
// root timelines are initialized with exact count, but never start the background
// calculation
return;
}
if let Some(await_bg_cancel) = self
.current_logical_size
.cancel_wait_for_background_loop_concurrency_limit_semaphore
@@ -5697,6 +5205,22 @@ impl Timeline {
}
}
/// Persistently blocks gc for `Manual` reason.
///
/// Returns true if no such block existed before, false otherwise.
pub(crate) async fn block_gc(&self, tenant: &super::Tenant) -> anyhow::Result<bool> {
use crate::tenant::remote_timeline_client::index::GcBlockingReason;
assert_eq!(self.tenant_shard_id, tenant.tenant_shard_id);
tenant.gc_block.insert(self, GcBlockingReason::Manual).await
}
/// Persistently unblocks gc for `Manual` reason.
pub(crate) async fn unblock_gc(&self, tenant: &super::Tenant) -> anyhow::Result<()> {
use crate::tenant::remote_timeline_client::index::GcBlockingReason;
assert_eq!(self.tenant_shard_id, tenant.tenant_shard_id);
tenant.gc_block.remove(self, GcBlockingReason::Manual).await
}
#[cfg(test)]
pub(super) fn force_advance_lsn(self: &Arc<Timeline>, new_lsn: Lsn) {
self.last_record_lsn.advance(new_lsn);
@@ -5878,8 +5402,6 @@ impl Timeline {
}
}
type TraversalPathItem = (ValueReconstructResult, Lsn, TraversalId);
/// Tracking writes ingestion does to a particular in-memory layer.
///
/// Cleared upon freezing a layer.

View File

@@ -19,8 +19,10 @@ use bytes::Bytes;
use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
use pageserver_api::key::KEY_SIZE;
use pageserver_api::keyspace::ShardedRange;
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
use serde::Serialize;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, info_span, trace, warn, Instrument};
use utils::id::TimelineId;
@@ -41,6 +43,7 @@ use crate::virtual_file::{MaybeFatalIo, VirtualFile};
use crate::keyspace::KeySpace;
use crate::repository::{Key, Value};
use crate::walrecord::NeonWalRecord;
use utils::lsn::Lsn;
@@ -73,6 +76,7 @@ impl KeyHistoryRetention {
key: Key,
delta_writer: &mut Vec<(Key, Lsn, Value)>,
mut image_writer: Option<&mut ImageLayerWriter>,
stat: &mut CompactionStatistics,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut first_batch = true;
@@ -82,6 +86,7 @@ impl KeyHistoryRetention {
let Value::Image(img) = &logs[0].1 else {
unreachable!()
};
stat.produce_image_key(img);
if let Some(image_writer) = image_writer.as_mut() {
image_writer.put_image(key, img.clone(), ctx).await?;
} else {
@@ -89,24 +94,111 @@ impl KeyHistoryRetention {
}
} else {
for (lsn, val) in logs {
stat.produce_key(&val);
delta_writer.push((key, lsn, val));
}
}
first_batch = false;
} else {
for (lsn, val) in logs {
stat.produce_key(&val);
delta_writer.push((key, lsn, val));
}
}
}
let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
for (lsn, val) in above_horizon_logs {
stat.produce_key(&val);
delta_writer.push((key, lsn, val));
}
Ok(())
}
}
#[derive(Debug, Serialize, Default)]
struct CompactionStatisticsNumSize {
num: u64,
size: u64,
}
#[derive(Debug, Serialize, Default)]
pub struct CompactionStatistics {
delta_layer_visited: CompactionStatisticsNumSize,
image_layer_visited: CompactionStatisticsNumSize,
delta_layer_produced: CompactionStatisticsNumSize,
image_layer_produced: CompactionStatisticsNumSize,
num_delta_layer_discarded: usize,
num_image_layer_discarded: usize,
num_unique_keys_visited: usize,
wal_keys_visited: CompactionStatisticsNumSize,
image_keys_visited: CompactionStatisticsNumSize,
wal_produced: CompactionStatisticsNumSize,
image_produced: CompactionStatisticsNumSize,
}
impl CompactionStatistics {
fn estimated_size_of_value(val: &Value) -> usize {
match val {
Value::Image(img) => img.len(),
Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
_ => std::mem::size_of::<NeonWalRecord>(),
}
}
fn estimated_size_of_key() -> usize {
KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
}
fn visit_delta_layer(&mut self, size: u64) {
self.delta_layer_visited.num += 1;
self.delta_layer_visited.size += size;
}
fn visit_image_layer(&mut self, size: u64) {
self.image_layer_visited.num += 1;
self.image_layer_visited.size += size;
}
fn on_unique_key_visited(&mut self) {
self.num_unique_keys_visited += 1;
}
fn visit_wal_key(&mut self, val: &Value) {
self.wal_keys_visited.num += 1;
self.wal_keys_visited.size +=
Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
}
fn visit_image_key(&mut self, val: &Value) {
self.image_keys_visited.num += 1;
self.image_keys_visited.size +=
Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
}
fn produce_key(&mut self, val: &Value) {
match val {
Value::Image(img) => self.produce_image_key(img),
Value::WalRecord(_) => self.produce_wal_key(val),
}
}
fn produce_wal_key(&mut self, val: &Value) {
self.wal_produced.num += 1;
self.wal_produced.size +=
Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
}
fn produce_image_key(&mut self, val: &Bytes) {
self.image_produced.num += 1;
self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
}
fn discard_delta_layer(&mut self) {
self.num_delta_layer_discarded += 1;
}
fn discard_image_layer(&mut self) {
self.num_image_layer_discarded += 1;
}
fn produce_delta_layer(&mut self, size: u64) {
self.delta_layer_produced.num += 1;
self.delta_layer_produced.size += size;
}
fn produce_image_layer(&mut self, size: u64) {
self.image_layer_produced.num += 1;
self.image_layer_produced.size += size;
}
}
impl Timeline {
/// TODO: cancellation
///
@@ -118,12 +210,18 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<bool, CompactionError> {
if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
self.compact_with_gc(cancel, ctx)
self.compact_with_gc(cancel, flags, ctx)
.await
.map_err(CompactionError::Other)?;
return Ok(false);
}
if flags.contains(CompactFlags::DryRun) {
return Err(CompactionError::Other(anyhow!(
"dry-run mode is not supported for legacy compaction for now"
)));
}
// High level strategy for compaction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
@@ -1641,6 +1739,7 @@ impl Timeline {
pub(crate) async fn compact_with_gc(
self: &Arc<Self>,
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
use std::collections::BTreeSet;
@@ -1664,12 +1763,16 @@ impl Timeline {
)
.await?;
info!("running enhanced gc bottom-most compaction");
let dry_run = flags.contains(CompactFlags::DryRun);
info!("running enhanced gc bottom-most compaction, dry_run={dry_run}");
scopeguard::defer! {
info!("done enhanced gc bottom-most compaction");
};
let mut stat = CompactionStatistics::default();
// Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
// The layer selection has the following properties:
// 1. If a layer is in the selection, all layers below it are in the selection.
@@ -1740,6 +1843,9 @@ impl Timeline {
let key_range = desc.get_key_range();
delta_split_points.insert(key_range.start);
delta_split_points.insert(key_range.end);
stat.visit_delta_layer(desc.file_size());
} else {
stat.visit_image_layer(desc.file_size());
}
}
let mut delta_layers = Vec::new();
@@ -1775,6 +1881,8 @@ impl Timeline {
tline: &Arc<Timeline>,
lowest_retain_lsn: Lsn,
ctx: &RequestContext,
stats: &mut CompactionStatistics,
dry_run: bool,
last_batch: bool,
) -> anyhow::Result<Option<FlushDeltaResult>> {
// Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
@@ -1831,6 +1939,7 @@ impl Timeline {
let layer_generation = guard.get_from_key(&delta_key).metadata().generation;
drop(guard);
if layer_generation == tline.generation {
stats.discard_delta_layer();
// TODO: depending on whether we design this compaction process to run along with
// other compactions, there could be layer map modifications after we drop the
// layer guard, and in case it creates duplicated layer key, we will still error
@@ -1857,6 +1966,10 @@ impl Timeline {
for (key, lsn, val) in deltas {
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
}
stats.produce_delta_layer(delta_layer_writer.size());
if dry_run {
return Ok(None);
}
let delta_layer = delta_layer_writer
.finish(delta_key.key_range.end, tline, ctx)
.await?;
@@ -1951,6 +2064,13 @@ impl Timeline {
let mut current_delta_split_point = 0;
let mut delta_layers = Vec::new();
while let Some((key, lsn, val)) = merge_iter.next().await? {
if cancel.is_cancelled() {
return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
}
match val {
Value::Image(_) => stat.visit_image_key(&val),
Value::WalRecord(_) => stat.visit_wal_key(&val),
}
if last_key.is_none() || last_key.as_ref() == Some(&key) {
if last_key.is_none() {
last_key = Some(key);
@@ -1958,6 +2078,7 @@ impl Timeline {
accumulated_values.push((key, lsn, val));
} else {
let last_key = last_key.as_mut().unwrap();
stat.on_unique_key_visited();
let retention = self
.generate_key_retention(
*last_key,
@@ -1974,6 +2095,7 @@ impl Timeline {
*last_key,
&mut delta_values,
image_layer_writer.as_mut(),
&mut stat,
ctx,
)
.await?;
@@ -1986,6 +2108,8 @@ impl Timeline {
self,
lowest_retain_lsn,
ctx,
&mut stat,
dry_run,
false,
)
.await?,
@@ -1998,6 +2122,7 @@ impl Timeline {
let last_key = last_key.expect("no keys produced during compaction");
// TODO: move this part to the loop body
stat.on_unique_key_visited();
let retention = self
.generate_key_retention(
last_key,
@@ -2014,6 +2139,7 @@ impl Timeline {
last_key,
&mut delta_values,
image_layer_writer.as_mut(),
&mut stat,
ctx,
)
.await?;
@@ -2026,6 +2152,8 @@ impl Timeline {
self,
lowest_retain_lsn,
ctx,
&mut stat,
dry_run,
true,
)
.await?,
@@ -2033,12 +2161,28 @@ impl Timeline {
assert!(delta_values.is_empty(), "unprocessed keys");
let image_layer = if discard_image_layer {
stat.discard_image_layer();
None
} else if let Some(writer) = image_layer_writer {
Some(writer.finish(self, ctx).await?)
stat.produce_image_layer(writer.size());
if !dry_run {
Some(writer.finish(self, ctx).await?)
} else {
None
}
} else {
None
};
info!(
"gc-compaction statistics: {}",
serde_json::to_string(&stat)?
);
if dry_run {
return Ok(());
}
info!(
"produced {} delta layers and {} image layers",
delta_layers.len(),
@@ -2062,6 +2206,7 @@ impl Timeline {
let mut layer_selection = layer_selection;
layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
compact_to.extend(image_layer);
// Step 3: Place back to the layer map.
{
let mut guard = self.layers.write().await;

View File

@@ -230,6 +230,8 @@ impl DeleteTimelineFlow {
// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
timeline.shutdown(super::ShutdownMode::Hard).await;
tenant.gc_block.before_delete(&timeline);
fail::fail_point!("timeline-delete-before-index-deleted-at", |_| {
Err(anyhow::anyhow!(
"failpoint: timeline-delete-before-index-deleted-at"

View File

@@ -122,6 +122,10 @@ impl CurrentLogicalSize {
Self::Exact(_) => Accuracy::Exact,
}
}
pub(crate) fn is_exact(&self) -> bool {
matches!(self, Self::Exact(_))
}
}
impl LogicalSize {

View File

@@ -535,7 +535,7 @@ impl WalIngest {
// The page may be uninitialized. If so, we can't set the LSN because
// that would corrupt the page.
//
if !blk.opaque && !page_is_new(&image) {
if !page_is_new(&image) {
page_set_lsn(&mut image, lsn)
}
assert_eq!(image.len(), BLCKSZ as usize);

View File

@@ -129,7 +129,6 @@ pub struct DecodedBkpBlock {
pub apply_image: bool,
/* has image that should be restored */
pub will_init: bool,
pub opaque: bool,
/* record doesn't need previous page version to apply */
//char *bkp_image;
pub hole_offset: u16,
@@ -1001,7 +1000,6 @@ pub fn decode_wal_record(
blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0;
blk.opaque = (fork_flags & pg_constants::BKPBLOCK_OPAQUE) != 0;
blk.data_len = buf.get_u16_le();
/* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */

16
poetry.lock generated
View File

@@ -1514,6 +1514,20 @@ files = [
[package.dependencies]
six = "*"
[[package]]
name = "kafka-python"
version = "2.0.2"
description = "Pure Python client for Apache Kafka"
optional = false
python-versions = "*"
files = [
{file = "kafka-python-2.0.2.tar.gz", hash = "sha256:04dfe7fea2b63726cd6f3e79a2d86e709d608d74406638c5da33a01d45a9d7e3"},
{file = "kafka_python-2.0.2-py2.py3-none-any.whl", hash = "sha256:2d92418c7cb1c298fa6c7f0fb3519b520d0d7526ac6cb7ae2a4fc65a51a94b6e"},
]
[package.extras]
crc32c = ["crc32c"]
[[package]]
name = "lazy-object-proxy"
version = "1.10.0"
@@ -3357,4 +3371,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "7cee6a8c30bc7f4bfb0a87c6bad3952dfb4da127fad853d2710a93ac3eab8a00"
content-hash = "d569a3593b98baceb0a88e176bdad63cae99d6bfc2a81bf6741663a4abcafd72"

View File

@@ -1,8 +1,7 @@
[tool.poetry]
name = "neon"
version = "0.1.0"
description = ""
authors = []
package-mode = false
[tool.poetry.dependencies]
python = "^3.9"
@@ -42,6 +41,7 @@ httpx = {extras = ["http2"], version = "^0.26.0"}
pytest-repeat = "^0.9.3"
websockets = "^12.0"
clickhouse-connect = "^0.7.16"
kafka-python = "^2.0.2"
[tool.poetry.group.dev.dependencies]
mypy = "==1.3.0"
@@ -75,6 +75,7 @@ module = [
"allure.*",
"allure_commons.*",
"allure_pytest.*",
"kafka.*",
]
ignore_missing_imports = true

View File

@@ -7,6 +7,7 @@ use clap::{ArgAction, Parser};
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use remote_storage::RemoteStorageConfig;
use sd_notify::NotifyState;
use tokio::runtime::Handle;
@@ -204,6 +205,7 @@ fn opt_pathbuf_parser(s: &str) -> Result<Utf8PathBuf, String> {
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
// We want to allow multiple occurences of the same arg (taking the last) so
// that neon_local could generate command with defaults + overrides without
// getting 'argument cannot be used multiple times' error. This seems to be
@@ -356,14 +358,14 @@ async fn main() -> anyhow::Result<()> {
Some(GIT_VERSION.into()),
&[("node_id", &conf.my_id.to_string())],
);
start_safekeeper(conf).await
start_safekeeper(launch_ts, conf).await
}
/// Result of joining any of main tasks: upper error means task failed to
/// complete, e.g. panicked, inner is error produced by task itself.
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
async fn start_safekeeper(launch_ts: &'static LaunchTimestamp, conf: SafeKeeperConf) -> Result<()> {
// Prevent running multiple safekeepers on the same directory
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
let lock_file =
@@ -491,6 +493,7 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
tasks_handles.push(Box::pin(broker_task_handle));
set_build_info_metric(GIT_VERSION, BUILD_TAG);
set_launch_timestamp_metric(launch_ts);
// TODO: update tokio-stream, convert to real async Stream with
// SignalStream, map it to obtain missing signal name, combine streams into

View File

@@ -92,7 +92,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
.push(format!("index_part.json version: {}", index_part.version()))
}
let mut newest_versions = IndexPart::KNOWN_VERSIONS.iter().rev().take(2);
let mut newest_versions = IndexPart::KNOWN_VERSIONS.iter().rev().take(3);
if !newest_versions.any(|ip| ip == &index_part.version()) {
info!(
"index_part.json version is not latest: {}",

View File

@@ -4,7 +4,7 @@ use anyhow::{anyhow, Context};
use async_stream::{stream, try_stream};
use aws_sdk_s3::{types::ObjectIdentifier, Client};
use futures::StreamExt;
use remote_storage::{GenericRemoteStorage, ListingMode};
use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath};
use tokio_stream::Stream;
use crate::{
@@ -276,3 +276,33 @@ pub(crate) fn stream_listing<'a>(
}
}
}
pub(crate) fn stream_listing_generic<'a>(
remote_client: &'a GenericRemoteStorage,
target: &'a S3Target,
) -> impl Stream<Item = anyhow::Result<(RemotePath, Option<ListingObject>)>> + 'a {
let listing_mode = if target.delimiter.is_empty() {
ListingMode::NoDelimiter
} else {
ListingMode::WithDelimiter
};
try_stream! {
let mut objects_stream = std::pin::pin!(stream_objects_with_retries(
remote_client,
listing_mode,
target,
));
while let Some(list) = objects_stream.next().await {
let list = list?;
if target.delimiter.is_empty() {
for key in list.keys {
yield (key.key.clone(), Some(key));
}
} else {
for key in list.prefixes {
yield (key, None);
}
}
}
}
}

View File

@@ -1,10 +1,10 @@
use std::{collections::HashSet, str::FromStr, sync::Arc};
use aws_sdk_s3::Client;
use futures::stream::{StreamExt, TryStreamExt};
use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use postgres_ffi::{XLogFileName, PG_TLI};
use remote_storage::GenericRemoteStorage;
use serde::Serialize;
use tokio_postgres::types::PgLsn;
use tracing::{error, info, trace};
@@ -14,8 +14,9 @@ use utils::{
};
use crate::{
cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
cloud_admin_api::CloudAdminApiClient, init_remote_generic,
metadata_stream::stream_listing_generic, BucketConfig, ConsoleConfig, NodeKind, RootTarget,
TenantShardTimelineId,
};
/// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
@@ -106,7 +107,7 @@ pub async fn scan_safekeeper_metadata(
let timelines = client.query(&query, &[]).await?;
info!("loaded {} timelines", timelines.len());
let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
let (remote_client, target) = init_remote_generic(bucket_config, NodeKind::Safekeeper).await?;
let console_config = ConsoleConfig::from_env()?;
let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
@@ -119,7 +120,7 @@ pub async fn scan_safekeeper_metadata(
let backup_lsn: Lsn = Lsn(u64::from(backup_lsn_pg));
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
check_timeline(
&s3_client,
&remote_client,
&target,
&cloud_admin_api_client,
ttid,
@@ -156,7 +157,7 @@ struct TimelineCheckResult {
/// errors are logged to stderr; returns Ok(true) if timeline is consistent,
/// Ok(false) if not, Err if failed to check.
async fn check_timeline(
s3_client: &Client,
remote_client: &GenericRemoteStorage,
root: &RootTarget,
api_client: &CloudAdminApiClient,
ttid: TenantTimelineId,
@@ -187,12 +188,13 @@ async fn check_timeline(
// we need files, so unset it.
timeline_dir_target.delimiter = String::new();
let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target));
let mut stream = std::pin::pin!(stream_listing_generic(remote_client, &timeline_dir_target));
while let Some(obj) = stream.next().await {
let obj = obj?;
let key = obj.key();
let (key, _obj) = obj?;
let seg_name = key
.get_path()
.as_str()
.strip_prefix(&timeline_dir_target.prefix_in_bucket)
.expect("failed to extract segment name");
expected_segfiles.remove(seg_name);

View File

@@ -285,9 +285,9 @@ class NeonApiEndpoint:
self.project_id = project_id
eps = neon_api.get_endpoints(project_id)["endpoints"]
self.endpoint_id = eps[0]["id"]
self.connstr = neon_api.get_connection_uri(project_id, endpoint_id=self.endpoint_id)[
"uri"
]
self.connstr = neon_api.get_connection_uri(
project_id, endpoint_id=self.endpoint_id, pooled=False
)["uri"]
pw = self.connstr.split("@")[0].split(":")[-1]
self.pgbench_env = {
"PGHOST": eps[0]["host"],

View File

@@ -556,6 +556,22 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
assert isinstance(res_json, dict)
return res_json
def timeline_block_gc(self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/block_gc",
)
log.info(f"Got GC request response code: {res.status_code}")
self.verbose_error(res)
def timeline_unblock_gc(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/unblock_gc",
)
log.info(f"Got GC request response code: {res.status_code}")
self.verbose_error(res)
def timeline_compact(
self,
tenant_id: Union[TenantId, TenantShardId],

View File

@@ -389,7 +389,10 @@ WaitUntilRet = TypeVar("WaitUntilRet")
def wait_until(
number_of_iterations: int, interval: float, func: Callable[[], WaitUntilRet]
number_of_iterations: int,
interval: float,
func: Callable[[], WaitUntilRet],
show_intermediate_error=False,
) -> WaitUntilRet:
"""
Wait until 'func' returns successfully, without exception. Returns the
@@ -402,6 +405,8 @@ def wait_until(
except Exception as e:
log.info("waiting for %s iteration %s failed", func, i + 1)
last_exception = e
if show_intermediate_error:
log.info(e)
time.sleep(interval)
continue
return res

View File

@@ -0,0 +1,22 @@
# Logical replication tests
## Clickhouse
```bash
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
docker compose -f clickhouse/docker-compose.yml up -d
pytest -m remote_cluster -k test_clickhouse
docker compose -f clickhouse/docker-compose.yml down
```
## Debezium
```bash
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
docker compose -f debezium/docker-compose.yml up -d
pytest -m remote_cluster -k test_debezium
docker compose -f debezium/docker-compose.yml down
```

View File

@@ -0,0 +1,9 @@
services:
clickhouse:
image: clickhouse/clickhouse-server
user: "101:101"
container_name: clickhouse
hostname: clickhouse
ports:
- 127.0.0.1:8123:8123
- 127.0.0.1:9000:9000

View File

@@ -0,0 +1,24 @@
services:
zookeeper:
image: quay.io/debezium/zookeeper:2.7
kafka:
image: quay.io/debezium/kafka:2.7
environment:
ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- 127.0.0.1:9092:9092
debezium:
image: quay.io/debezium/connect:2.7
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: debezium-config
OFFSET_STORAGE_TOPIC: debezium-offset
STATUS_STORAGE_TOPIC: debezium-status
DEBEZIUM_CONFIG_CONNECTOR_CLASS: io.debezium.connector.postgresql.PostgresConnector
ports:
- 127.0.0.1:8083:8083

View File

@@ -1,8 +1,9 @@
"""
Test the logical replication in Neon with the different consumers
Test the logical replication in Neon with ClickHouse as a consumer
"""
import hashlib
import os
import time
import clickhouse_connect
@@ -39,22 +40,15 @@ def test_clickhouse(remote_pg: RemotePostgres):
"""
Test the logical replication having ClickHouse as a client
"""
clickhouse_host = "clickhouse" if ("CI" in os.environ) else "127.0.0.1"
conn_options = remote_pg.conn_options()
for _ in range(5):
try:
conn = psycopg2.connect(remote_pg.connstr())
except psycopg2.OperationalError as perr:
log.debug(perr)
time.sleep(1)
else:
break
raise TimeoutError
conn = psycopg2.connect(remote_pg.connstr())
cur = conn.cursor()
cur.execute("DROP TABLE IF EXISTS table1")
cur.execute("CREATE TABLE table1 (id integer primary key, column1 varchar(10));")
cur.execute("INSERT INTO table1 (id, column1) VALUES (1, 'abc'), (2, 'def');")
conn.commit()
client = clickhouse_connect.get_client(host="clickhouse")
client = clickhouse_connect.get_client(host=clickhouse_host)
client.command("SET allow_experimental_database_materialized_postgresql=1")
client.command(
"CREATE DATABASE db1_postgres ENGINE = "

View File

@@ -0,0 +1,189 @@
"""
Test the logical replication in Neon with Debezium as a consumer
"""
import json
import os
import time
import psycopg2
import pytest
import requests
from fixtures.log_helper import log
from fixtures.neon_fixtures import RemotePostgres
from fixtures.utils import wait_until
from kafka import KafkaConsumer
class DebeziumAPI:
"""
The class for Debezium API calls
"""
def __init__(self):
self.__host = "debezium" if ("CI" in os.environ) else "127.0.0.1"
self.__base_url = f"http://{self.__host}:8083"
self.__connectors_url = f"{self.__base_url}/connectors"
def __request(self, method, addurl="", **kwargs):
return requests.request(
method,
self.__connectors_url + addurl,
headers={"Accept": "application/json", "Content-type": "application/json"},
timeout=60,
**kwargs,
)
def create_pg_connector(self, remote_pg: RemotePostgres, dbz_conn_name: str):
"""
Create a Postgres connector in debezium
"""
conn_options = remote_pg.conn_options()
payload = {
"name": dbz_conn_name,
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": conn_options["host"],
"database.port": "5432",
"database.user": conn_options["user"],
"database.password": conn_options["password"],
"database.dbname": conn_options["dbname"],
"plugin.name": "pgoutput",
"topic.prefix": "dbserver1",
"schema.include.list": "inventory",
},
}
return self.__request("POST", json=payload)
def list_connectors(self):
"""
Returns a list of all connectors existent in Debezium.
"""
resp = self.__request("GET")
assert resp.ok
return json.loads(resp.text)
def del_connector(self, connector):
"""
Deletes the specified connector
"""
return self.__request("DELETE", f"/{connector}")
@pytest.fixture(scope="function")
def debezium(remote_pg: RemotePostgres):
"""
Prepare the Debezium API handler, connection
"""
conn = psycopg2.connect(remote_pg.connstr())
cur = conn.cursor()
cur.execute("DROP SCHEMA IF EXISTS inventory CASCADE")
cur.execute("CREATE SCHEMA inventory")
cur.execute(
"CREATE TABLE inventory.customers ("
"id SERIAL NOT NULL PRIMARY KEY,"
"first_name character varying(255) NOT NULL,"
"last_name character varying(255) NOT NULL,"
"email character varying(255) NOT NULL)"
)
conn.commit()
dbz = DebeziumAPI()
assert len(dbz.list_connectors()) == 0
dbz_conn_name = "inventory-connector"
resp = dbz.create_pg_connector(remote_pg, dbz_conn_name)
log.debug("%s %s %s", resp.status_code, resp.ok, resp.text)
assert resp.status_code == 201
assert len(dbz.list_connectors()) == 1
consumer = KafkaConsumer(
"dbserver1.inventory.customers",
bootstrap_servers=["kafka:9092"],
auto_offset_reset="earliest",
enable_auto_commit=False,
)
yield conn, consumer
resp = dbz.del_connector(dbz_conn_name)
assert resp.status_code == 204
def get_kafka_msg(consumer, ts_ms, before=None, after=None) -> None:
"""
Gets the message from Kafka and checks its validity
Arguments:
consumer: the consumer object
ts_ms: timestamp in milliseconds of the change of db, the corresponding message must have
the later timestamp
before: a dictionary, if not None, the before field from the kafka message must
have the same values for the same keys
after: a dictionary, if not None, the after field from the kafka message must
have the same values for the same keys
"""
msg = consumer.poll()
assert msg, "Empty message"
for val in msg.values():
r = json.loads(val[-1].value)
log.info(r["payload"])
assert ts_ms < r["payload"]["ts_ms"], "Incorrect timestamp"
for param, pname in ((before, "before"), (after, "after")):
if param is not None:
for k, v in param.items():
assert r["payload"][pname][k] == v, f"{pname} mismatches"
@pytest.mark.remote_cluster
def test_debezium(debezium):
"""
Test the logical replication having Debezium as a subscriber
"""
conn, consumer = debezium
cur = conn.cursor()
ts_ms = time.time() * 1000
log.info("Insert 1 ts_ms: %s", ts_ms)
cur.execute(
"insert into inventory.customers (first_name, last_name, email) "
"values ('John', 'Dow','johndow@example.com')"
)
conn.commit()
wait_until(
100,
0.5,
lambda: get_kafka_msg(
consumer,
ts_ms,
after={"first_name": "John", "last_name": "Dow", "email": "johndow@example.com"},
),
show_intermediate_error=True,
)
ts_ms = time.time() * 1000
log.info("Insert 2 ts_ms: %s", ts_ms)
cur.execute(
"insert into inventory.customers (first_name, last_name, email) "
"values ('Alex', 'Row','alexrow@example.com')"
)
conn.commit()
wait_until(
100,
0.5,
lambda: get_kafka_msg(
consumer,
ts_ms,
after={"first_name": "Alex", "last_name": "Row", "email": "alexrow@example.com"},
),
show_intermediate_error=True,
)
ts_ms = time.time() * 1000
log.info("Update ts_ms: %s", ts_ms)
cur.execute("update inventory.customers set first_name = 'Alexander' where id = 2")
conn.commit()
wait_until(
100,
0.5,
lambda: get_kafka_msg(
consumer,
ts_ms,
after={"first_name": "Alexander"},
),
show_intermediate_error=True,
)
time.sleep(3)
cur.execute("select 1")

View File

@@ -100,24 +100,32 @@ def test_subscriber_lag(
pub_connstr = benchmark_project_pub.connstr
sub_connstr = benchmark_project_sub.connstr
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=pub_env)
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=sub_env)
if benchmark_project_pub.is_new:
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=pub_env)
if benchmark_project_sub.is_new:
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=sub_env)
pub_conn = psycopg2.connect(pub_connstr)
sub_conn = psycopg2.connect(sub_connstr)
pub_conn.autocommit = True
sub_conn.autocommit = True
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
if benchmark_project_pub.is_new:
pub_cur.execute("create publication pub1 for table pgbench_accounts, pgbench_history")
pub_cur.execute("SELECT 1 FROM pg_catalog.pg_publication WHERE pubname = 'pub1'")
pub_exists = len(pub_cur.fetchall()) != 0
if benchmark_project_sub.is_new:
if not pub_exists:
pub_cur.execute("CREATE PUBLICATION pub1 FOR TABLE pgbench_accounts, pgbench_history")
sub_cur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub1'")
sub_exists = len(sub_cur.fetchall()) != 0
if not sub_exists:
sub_cur.execute("truncate table pgbench_accounts")
sub_cur.execute("truncate table pgbench_history")
sub_cur.execute(f"create subscription sub1 connection '{pub_connstr}' publication pub1")
sub_cur.execute(f"CREATE SUBSCRIPTION sub1 CONNECTION '{pub_connstr}' PUBLICATION pub1")
initial_sync_lag = measure_logical_replication_lag(sub_cur, pub_cur)
pub_conn.close()
sub_conn.close()
@@ -195,10 +203,15 @@ def test_publisher_restart(
pub_conn.autocommit = True
sub_conn.autocommit = True
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
if benchmark_project_pub.is_new:
pub_cur.execute("SELECT 1 FROM pg_catalog.pg_publication WHERE pubname = 'pub1'")
pub_exists = len(pub_cur.fetchall()) != 0
if not pub_exists:
pub_cur.execute("create publication pub1 for table pgbench_accounts, pgbench_history")
if benchmark_project_sub.is_new:
sub_cur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub1'")
sub_exists = len(sub_cur.fetchall()) != 0
if not sub_exists:
sub_cur.execute("truncate table pgbench_accounts")
sub_cur.execute("truncate table pgbench_history")

View File

@@ -0,0 +1,67 @@
import time
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.pageserver.utils import wait_timeline_detail_404
def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start(
initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"}
)
ps = env.pageserver
http = ps.http_client()
foo_branch = env.neon_cli.create_branch("foo", "main", env.initial_tenant)
gc_active_line = ".* gc_loop.*: [12] timelines need GC"
gc_skipped_line = ".* gc_loop.*: Skipping GC: .*"
init_gc_skipped = ".*: initialized with gc blocked.*"
tenant_before = http.tenant_status(env.initial_tenant)
wait_for_another_gc_round()
_, offset = ps.assert_log_contains(gc_active_line)
assert ps.log_contains(gc_skipped_line, offset) is None
http.timeline_block_gc(env.initial_tenant, foo_branch)
tenant_after = http.tenant_status(env.initial_tenant)
assert tenant_before != tenant_after
gc_blocking = tenant_after["gc_blocking"]
assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }"
wait_for_another_gc_round()
_, offset = ps.assert_log_contains(gc_skipped_line, offset)
ps.restart()
ps.quiesce_tenants()
_, offset = env.pageserver.assert_log_contains(init_gc_skipped, offset)
wait_for_another_gc_round()
_, offset = ps.assert_log_contains(gc_skipped_line, offset)
# deletion unblocks gc
http.timeline_delete(env.initial_tenant, foo_branch)
wait_timeline_detail_404(http, env.initial_tenant, foo_branch, 10, 1.0)
wait_for_another_gc_round()
_, offset = ps.assert_log_contains(gc_active_line, offset)
http.timeline_block_gc(env.initial_tenant, env.initial_timeline)
wait_for_another_gc_round()
_, offset = ps.assert_log_contains(gc_skipped_line, offset)
# removing the manual block also unblocks gc
http.timeline_unblock_gc(env.initial_tenant, env.initial_timeline)
wait_for_another_gc_round()
_, offset = ps.assert_log_contains(gc_active_line, offset)
def wait_for_another_gc_round():
time.sleep(2)

View File

@@ -936,6 +936,9 @@ def test_timeline_logical_size_task_priority(neon_env_builder: NeonEnvBuilder):
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# just make sure this doesn't hit an assertion
client.timeline_detail(tenant_id, timeline_id, force_await_initial_logical_size=True)
# load in some data
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
endpoint.safe_psql_many(

View File

@@ -1,5 +1,5 @@
{
"v16": ["16.3", "60fab0e62ca0150276bf03231cc1339b29d3465c"],
"v16": ["16.3", "5377f5ed7290af45b7cb6b0d98d43cbf4a4e77f3"],
"v15": ["15.7", "9eba7dd382606ffca43aca865f337ec21bcdac73"],
"v14": ["14.12", "7bbe834c8c2dc37802eca8484311599bc47341f6"]
}