mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-19 10:30:37 +00:00
Compare commits
22 Commits
jcsp/inges
...
arpad/safe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a3ca7eeb23 | ||
|
|
078f941dc8 | ||
|
|
68bcbf8227 | ||
|
|
a31c95cb40 | ||
|
|
dc7eb5ae5a | ||
|
|
44fedfd6c3 | ||
|
|
138f008bab | ||
|
|
6a6f30e378 | ||
|
|
8f3bc5ae35 | ||
|
|
e6e578821b | ||
|
|
c32807ac19 | ||
|
|
50daff9655 | ||
|
|
bd845c7587 | ||
|
|
f63c8e5a8c | ||
|
|
200fa56b04 | ||
|
|
0f3dac265b | ||
|
|
1dc496a2c9 | ||
|
|
6814bdd30b | ||
|
|
0a667bc8ef | ||
|
|
f3acfb2d80 | ||
|
|
8c828c586e | ||
|
|
2334fed762 |
2
.github/actionlint.yml
vendored
2
.github/actionlint.yml
vendored
@@ -8,6 +8,8 @@ self-hosted-runner:
|
||||
- small-arm64
|
||||
- us-east-2
|
||||
config-variables:
|
||||
- BENCHMARK_PROJECT_ID_PUB
|
||||
- BENCHMARK_PROJECT_ID_SUB
|
||||
- REMOTE_STORAGE_AZURE_CONTAINER
|
||||
- REMOTE_STORAGE_AZURE_REGION
|
||||
- SLACK_UPCOMING_RELEASE_CHANNEL_ID
|
||||
|
||||
9
.github/workflows/benchmarking.yml
vendored
9
.github/workflows/benchmarking.yml
vendored
@@ -147,7 +147,7 @@ jobs:
|
||||
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
|
||||
env:
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
DEFAULT_PG_VERSION: 14
|
||||
DEFAULT_PG_VERSION: 16
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
BUILD_TYPE: remote
|
||||
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
|
||||
@@ -168,7 +168,7 @@ jobs:
|
||||
path: /tmp/neon/
|
||||
prefix: latest
|
||||
|
||||
- name: Run benchmark
|
||||
- name: Run Logical Replication benchmarks
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
@@ -176,12 +176,15 @@ jobs:
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
|
||||
extra_params: -m remote_cluster --timeout 5400
|
||||
pg_version: ${{ env.DEFAULT_PG_VERSION }}
|
||||
env:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
BENCHMARK_PROJECT_ID_PUB: ${{ vars.BENCHMARK_PROJECT_ID_PUB }}
|
||||
BENCHMARK_PROJECT_ID_SUB: ${{ vars.BENCHMARK_PROJECT_ID_SUB }}
|
||||
|
||||
- name: Run benchmark
|
||||
- name: Run Physical Replication benchmarks
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
|
||||
26
.github/workflows/pg-clients.yml
vendored
26
.github/workflows/pg-clients.yml
vendored
@@ -66,7 +66,31 @@ jobs:
|
||||
ports:
|
||||
- 9000:9000
|
||||
- 8123:8123
|
||||
|
||||
zookeeper:
|
||||
image: quay.io/debezium/zookeeper:2.7
|
||||
ports:
|
||||
- 2181:2181
|
||||
kafka:
|
||||
image: quay.io/debezium/kafka:2.7
|
||||
env:
|
||||
ZOOKEEPER_CONNECT: "zookeeper:2181"
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
KAFKA_JMX_PORT: 9991
|
||||
ports:
|
||||
- 9092:9092
|
||||
debezium:
|
||||
image: quay.io/debezium/connect:2.7
|
||||
env:
|
||||
BOOTSTRAP_SERVERS: kafka:9092
|
||||
GROUP_ID: 1
|
||||
CONFIG_STORAGE_TOPIC: debezium-config
|
||||
OFFSET_STORAGE_TOPIC: debezium-offset
|
||||
STATUS_STORAGE_TOPIC: debezium-status
|
||||
DEBEZIUM_CONFIG_CONNECTOR_CLASS: io.debezium.connector.postgresql.PostgresConnector
|
||||
ports:
|
||||
- 8083:8083
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
|
||||
38
.github/workflows/trigger-e2e-tests.yml
vendored
38
.github/workflows/trigger-e2e-tests.yml
vendored
@@ -13,8 +13,6 @@ defaults:
|
||||
env:
|
||||
# A concurrency group that we use for e2e-tests runs, matches `concurrency.group` above with `github.repository` as a prefix
|
||||
E2E_CONCURRENCY_GROUP: ${{ github.repository }}-e2e-tests-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
|
||||
|
||||
jobs:
|
||||
cancel-previous-e2e-tests:
|
||||
@@ -64,19 +62,35 @@ jobs:
|
||||
needs: [ tag ]
|
||||
runs-on: ubuntu-22.04
|
||||
env:
|
||||
EVENT_ACTION: ${{ github.event.action }}
|
||||
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
TAG: ${{ needs.tag.outputs.build-tag }}
|
||||
steps:
|
||||
- name: check if ecr image are present
|
||||
env:
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
|
||||
- name: Wait for `promote-images` job to finish
|
||||
# It's important to have a timeout here, the script in the step can run infinitely
|
||||
timeout-minutes: 60
|
||||
run: |
|
||||
for REPO in neon compute-tools compute-node-v14 vm-compute-node-v14 compute-node-v15 vm-compute-node-v15 compute-node-v16 vm-compute-node-v16; do
|
||||
OUTPUT=$(aws ecr describe-images --repository-name ${REPO} --region eu-central-1 --query "imageDetails[?imageTags[?contains(@, '${TAG}')]]" --output text)
|
||||
if [ "$OUTPUT" == "" ]; then
|
||||
echo "$REPO with image tag $TAG not found" >> $GITHUB_OUTPUT
|
||||
exit 1
|
||||
fi
|
||||
if [ "${GITHUB_EVENT_NAME}" != "pull_request" ] || [ "${EVENT_ACTION}" != "ready_for_review" ]; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# For PRs we use the run id as the tag
|
||||
BUILD_AND_TEST_RUN_ID=${TAG}
|
||||
while true; do
|
||||
conclusion=$(gh run --repo ${GITHUB_REPOSITORY} view ${BUILD_AND_TEST_RUN_ID} --json jobs --jq '.jobs[] | select(.name == "promote-images") | .conclusion')
|
||||
case "$conclusion" in
|
||||
success)
|
||||
break
|
||||
;;
|
||||
failure | cancelled | skipped)
|
||||
echo "The 'promote-images' job didn't succeed: '${conclusion}'. Exiting..."
|
||||
exit 1
|
||||
;;
|
||||
*)
|
||||
echo "The 'promote-images' hasn't succeed yet. Waiting..."
|
||||
sleep 60
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
- name: Set e2e-platforms
|
||||
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5703,6 +5703,7 @@ dependencies = [
|
||||
"pageserver_client",
|
||||
"postgres_connection",
|
||||
"r2d2",
|
||||
"rand 0.8.5",
|
||||
"reqwest 0.12.4",
|
||||
"routerify",
|
||||
"scopeguard",
|
||||
|
||||
@@ -4,6 +4,11 @@ version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
# Enables test specific features.
|
||||
testing = []
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-compression.workspace = true
|
||||
|
||||
@@ -400,7 +400,15 @@ impl ComputeNode {
|
||||
pub fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
|
||||
let mut retry_period_ms = 500.0;
|
||||
let mut attempts = 0;
|
||||
let max_attempts = 10;
|
||||
const DEFAULT_ATTEMPTS: u16 = 10;
|
||||
#[cfg(feature = "testing")]
|
||||
let max_attempts = if let Ok(v) = env::var("NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES") {
|
||||
u16::from_str(&v).unwrap()
|
||||
} else {
|
||||
DEFAULT_ATTEMPTS
|
||||
};
|
||||
#[cfg(not(feature = "testing"))]
|
||||
let max_attempts = DEFAULT_ATTEMPTS;
|
||||
loop {
|
||||
let result = self.try_get_basebackup(compute_state, lsn);
|
||||
match result {
|
||||
|
||||
@@ -289,7 +289,7 @@ fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
|
||||
|
||||
fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command {
|
||||
for (var, val) in std::env::vars() {
|
||||
if var.starts_with("NEON_PAGESERVER_") {
|
||||
if var.starts_with("NEON_") {
|
||||
cmd = cmd.env(var, val);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -637,6 +637,13 @@ pub struct TenantInfo {
|
||||
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
|
||||
pub attachment_status: TenantAttachmentStatus,
|
||||
pub generation: u32,
|
||||
|
||||
/// Opaque explanation if gc is being blocked.
|
||||
///
|
||||
/// Only looked up for the individual tenant detail, not the listing. This is purely for
|
||||
/// debugging, not included in openapi.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub gc_blocking: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
@@ -1427,6 +1434,7 @@ mod tests {
|
||||
current_physical_size: Some(42),
|
||||
attachment_status: TenantAttachmentStatus::Attached,
|
||||
generation: 1,
|
||||
gc_blocking: None,
|
||||
};
|
||||
let expected_active = json!({
|
||||
"id": original_active.id.to_string(),
|
||||
@@ -1449,6 +1457,7 @@ mod tests {
|
||||
current_physical_size: Some(42),
|
||||
attachment_status: TenantAttachmentStatus::Attached,
|
||||
generation: 1,
|
||||
gc_blocking: None,
|
||||
};
|
||||
let expected_broken = json!({
|
||||
"id": original_broken.id.to_string(),
|
||||
|
||||
@@ -308,6 +308,45 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
type: string
|
||||
|
||||
/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/block_gc:
|
||||
parameters:
|
||||
- name: tenant_shard_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: timeline_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
post:
|
||||
description: Persistently add a gc blocking at the tenant level because of this timeline
|
||||
responses:
|
||||
"200":
|
||||
description: OK
|
||||
|
||||
/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/unblock_gc:
|
||||
parameters:
|
||||
- name: tenant_shard_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: timeline_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
post:
|
||||
description: Persistently remove a tenant level gc blocking for this timeline
|
||||
responses:
|
||||
"200":
|
||||
description: OK
|
||||
|
||||
/v1/tenant/{tenant_shard_id}/location_config:
|
||||
parameters:
|
||||
- name: tenant_shard_id
|
||||
|
||||
@@ -935,6 +935,7 @@ async fn tenant_list_handler(
|
||||
generation: (*gen)
|
||||
.into()
|
||||
.expect("Tenants are always attached with a generation"),
|
||||
gc_blocking: None,
|
||||
})
|
||||
.collect::<Vec<TenantInfo>>();
|
||||
|
||||
@@ -986,6 +987,7 @@ async fn tenant_status(
|
||||
.generation()
|
||||
.into()
|
||||
.expect("Tenants are always attached with a generation"),
|
||||
gc_blocking: tenant.gc_block.summary().map(|x| format!("{x:?}")),
|
||||
},
|
||||
walredo: tenant.wal_redo_manager_status(),
|
||||
timelines: tenant.list_timeline_ids(),
|
||||
@@ -1226,6 +1228,72 @@ async fn evict_timeline_layer_handler(
|
||||
}
|
||||
}
|
||||
|
||||
async fn timeline_gc_blocking_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
block_or_unblock_gc(request, true).await
|
||||
}
|
||||
|
||||
async fn timeline_gc_unblocking_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
block_or_unblock_gc(request, false).await
|
||||
}
|
||||
|
||||
/// Adding a block is `POST ../block_gc`, removing a block is `POST ../unblock_gc`.
|
||||
///
|
||||
/// Both are technically unsafe because they might fire off index uploads, thus they are POST.
|
||||
async fn block_or_unblock_gc(
|
||||
request: Request<Body>,
|
||||
block: bool,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
use crate::tenant::{
|
||||
remote_timeline_client::WaitCompletionError, upload_queue::NotInitialized,
|
||||
};
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
let state = get_state(&request);
|
||||
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
|
||||
let fut = async {
|
||||
if block {
|
||||
timeline.block_gc(&tenant).await.map(|_| ())
|
||||
} else {
|
||||
timeline.unblock_gc(&tenant).await
|
||||
}
|
||||
};
|
||||
|
||||
let span = tracing::info_span!(
|
||||
"block_or_unblock_gc",
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
shard_id = %tenant_shard_id.shard_slug(),
|
||||
timeline_id = %timeline_id,
|
||||
block = block,
|
||||
);
|
||||
|
||||
let res = fut.instrument(span).await;
|
||||
|
||||
res.map_err(|e| {
|
||||
if e.is::<NotInitialized>() || e.is::<WaitCompletionError>() {
|
||||
ApiError::ShuttingDown
|
||||
} else {
|
||||
ApiError::InternalServerError(e)
|
||||
}
|
||||
})?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Get tenant_size SVG graph along with the JSON data.
|
||||
fn synthetic_size_html_response(
|
||||
inputs: ModelInputs,
|
||||
@@ -2904,6 +2972,14 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
|
||||
|r| api_handler(r, evict_timeline_layer_handler),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/block_gc",
|
||||
|r| api_handler(r, timeline_gc_blocking_handler),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/unblock_gc",
|
||||
|r| api_handler(r, timeline_gc_unblocking_handler),
|
||||
)
|
||||
.post("/v1/tenant/:tenant_shard_id/heatmap_upload", |r| {
|
||||
api_handler(r, secondary_upload_handler)
|
||||
})
|
||||
|
||||
@@ -155,9 +155,17 @@ async fn import_rel(
|
||||
//
|
||||
// FIXME: Keep track of which relations we've already created?
|
||||
// https://github.com/neondatabase/neon/issues/3309
|
||||
modification
|
||||
if let Err(e) = modification
|
||||
.put_rel_creation(rel, nblocks as u32, ctx)
|
||||
.await?;
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
RelationError::AlreadyExists => {
|
||||
debug!("Relation {} already exist. We must be extending it.", rel)
|
||||
}
|
||||
_ => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
let r = reader.read_exact(&mut buf).await;
|
||||
|
||||
@@ -174,7 +174,6 @@ impl Timeline {
|
||||
pending_deletions: Vec::new(),
|
||||
pending_nblocks: 0,
|
||||
pending_directory_entries: Vec::new(),
|
||||
metadata_state: MetadataWriteState::new(self),
|
||||
lsn,
|
||||
}
|
||||
}
|
||||
@@ -1035,229 +1034,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Write something other than a simple postgres key/value: unlike regular relation page writes, these
|
||||
/// require access to a Timeline in order to do read-modify-write.
|
||||
#[derive(Debug)]
|
||||
enum MetadataOp {
|
||||
// - Insert to DBDIR_KEY if this (spcnode, dbnode) does not already exist
|
||||
// - Insert to rel_dir_to_key(spcnode, dbnode)
|
||||
UpsertRelDirectory { spcnode: Oid, dbnode: Oid },
|
||||
UpsertRelDirectory2 { rel: RelTag, nblocks: BlockNumber },
|
||||
// - Insert this xid to TWOPHASEDIR_KEY
|
||||
// UpdateTwoPhaseDir{
|
||||
// xid: TransactionId
|
||||
// },
|
||||
// // - Drop this (spcnode, dbnode) from DBDIR_KEY
|
||||
// DropDbDir {
|
||||
// spcnode: Oid,
|
||||
// dbnode: Oid
|
||||
// },
|
||||
// // - Drop this (spcnode, dbnode) from DBDIR_KEY
|
||||
// DropRel {
|
||||
// rel: RelTag,
|
||||
// },
|
||||
// // - Read-subtract-write the relation size for this rel
|
||||
// RelTruncate {
|
||||
// rel: RelTag,
|
||||
// nblocks: BlockNumber
|
||||
// },
|
||||
// // - Read-add-write the relation size for this rel
|
||||
// RelExtend {
|
||||
// rel: RelTag,
|
||||
// nblocks: BlockNumber
|
||||
// },
|
||||
// // - Read-modify-write of `slru_dir_to_key` and `slru_segment_size_to_key`
|
||||
// CreateSlruSegment {
|
||||
// kind: SlruKind,
|
||||
// segno: u32,
|
||||
// nblocks: BlockNumber,
|
||||
// }
|
||||
}
|
||||
|
||||
/// State that spans all the apply() calls of all the MetadataOp in a DatadirMotification
|
||||
struct MetadataWriteState<'a> {
|
||||
/// The timeline this modification applies to. You can access this to
|
||||
/// read the state, but note that any pending updates are *not* reflected
|
||||
/// in the state in 'tline' yet.
|
||||
pub tline: &'a Timeline,
|
||||
|
||||
// Write-through cache.
|
||||
// For pages that we read-modify-write, stash the last value here after each MetadataOp,
|
||||
// so that we don't have to enter Timeline::get more than necessary.
|
||||
last_write: HashMap<Key, Bytes>,
|
||||
|
||||
/// For special "directory" keys that store key-value maps, track the size of the map
|
||||
/// if it was updated in this modification.
|
||||
pending_directory_entries: Vec<(DirectoryKind, usize)>,
|
||||
|
||||
// Debug assertions: for calls that we expect to always come in LSN order, track the last LSN we saw
|
||||
#[cfg(debug_assertions)]
|
||||
debug_last_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl<'a> MetadataWriteState<'a> {
|
||||
fn new(timeline: &'a Timeline) -> Self {
|
||||
if cfg!(debug_assertions) {
|
||||
Self {
|
||||
tline: timeline,
|
||||
last_write: HashMap::default(),
|
||||
pending_directory_entries: Vec::default(),
|
||||
debug_last_lsn: Lsn(0),
|
||||
}
|
||||
} else {
|
||||
Self {
|
||||
tline: timeline,
|
||||
last_write: HashMap::default(),
|
||||
pending_directory_entries: Vec::default(),
|
||||
debug_last_lsn: Lsn(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
fn assert_lsn_order(&mut self, lsn: Lsn) {
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
debug_assert!(lsn >= self.debug_last_lsn);
|
||||
self.debug_last_lsn = lsn;
|
||||
}
|
||||
}
|
||||
|
||||
async fn get(
|
||||
&mut self,
|
||||
lsn: Lsn,
|
||||
key: Key,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
self.assert_lsn_order(lsn);
|
||||
|
||||
match self.last_write.get(&key) {
|
||||
Some(v) => Ok(v.clone()),
|
||||
None => self.tline.get(key, lsn, ctx).await,
|
||||
}
|
||||
}
|
||||
|
||||
/// Observe a page write
|
||||
fn put(&mut self, lsn: Lsn, key: Key, value: Bytes) {
|
||||
self.assert_lsn_order(lsn);
|
||||
|
||||
self.last_write.insert(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
impl MetadataOp {
|
||||
async fn apply<'a>(
|
||||
self,
|
||||
lsn: Lsn,
|
||||
data_dir_mod: &mut DatadirModification<'a>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
eprintln!("MetadataOp::apply: {self:?}");
|
||||
match self {
|
||||
Self::UpsertRelDirectory { spcnode, dbnode } => {
|
||||
// Add it to the directory (if it doesn't exist already)
|
||||
let buf = data_dir_mod.metadata_state.get(lsn, DBDIR_KEY, ctx).await?;
|
||||
let mut dbdir = DbDirectory::des(&buf)?;
|
||||
|
||||
let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
|
||||
if r.is_none() || r == Some(false) {
|
||||
// The dbdir entry didn't exist, or it contained a
|
||||
// 'false'. The 'insert' call already updated it with
|
||||
// 'true', now write the updated 'dbdirs' map back.
|
||||
let buf = DbDirectory::ser(&dbdir)?;
|
||||
data_dir_mod.put_metadata_page(lsn, DBDIR_KEY, Bytes::from(buf));
|
||||
}
|
||||
if r.is_none() {
|
||||
// Create RelDirectory
|
||||
let buf = RelDirectory::ser(&RelDirectory {
|
||||
rels: HashSet::new(),
|
||||
})?;
|
||||
data_dir_mod
|
||||
.metadata_state
|
||||
.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, 0));
|
||||
data_dir_mod.put_metadata_page(
|
||||
lsn,
|
||||
rel_dir_to_key(spcnode, dbnode),
|
||||
Bytes::from(buf),
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Self::UpsertRelDirectory2 { rel, nblocks } => {
|
||||
// It's possible that this is the first rel for this db in this
|
||||
// tablespace. Create the reldir entry for it if so.
|
||||
let mut dbdir = DbDirectory::des(
|
||||
&data_dir_mod
|
||||
.metadata_state
|
||||
.get(lsn, DBDIR_KEY, ctx)
|
||||
.await
|
||||
.context("read db")?,
|
||||
)
|
||||
.context("deserialize db")?;
|
||||
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
|
||||
let mut rel_dir = if let hash_map::Entry::Vacant(e) =
|
||||
dbdir.dbdirs.entry((rel.spcnode, rel.dbnode))
|
||||
{
|
||||
// Didn't exist. Update dbdir
|
||||
e.insert(false);
|
||||
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
|
||||
data_dir_mod
|
||||
.metadata_state
|
||||
.pending_directory_entries
|
||||
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
|
||||
data_dir_mod.put_metadata_page(lsn, DBDIR_KEY, buf.into());
|
||||
|
||||
// and create the RelDirectory
|
||||
RelDirectory::default()
|
||||
} else {
|
||||
// reldir already exists, fetch it
|
||||
RelDirectory::des(
|
||||
&data_dir_mod
|
||||
.metadata_state
|
||||
.get(lsn, rel_dir_key, ctx)
|
||||
.await
|
||||
.context("read db")?,
|
||||
)
|
||||
.context("deserialize db")?
|
||||
};
|
||||
|
||||
// Add the new relation to the rel directory entry, and write it back
|
||||
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
|
||||
// Drop out early if the relation already existed
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
data_dir_mod
|
||||
.metadata_state
|
||||
.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, rel_dir.rels.len()));
|
||||
|
||||
data_dir_mod.put_metadata_page(
|
||||
lsn,
|
||||
rel_dir_key,
|
||||
Bytes::from(RelDirectory::ser(&rel_dir).context("serialize")?),
|
||||
);
|
||||
|
||||
// Put size
|
||||
let size_key = rel_size_to_key(rel);
|
||||
let buf = nblocks.to_le_bytes();
|
||||
data_dir_mod.put_metadata_page(lsn, size_key, Bytes::from(buf.to_vec()));
|
||||
|
||||
data_dir_mod.pending_nblocks += nblocks as i64;
|
||||
|
||||
// Update relation size cache
|
||||
data_dir_mod
|
||||
.metadata_state
|
||||
.tline
|
||||
.set_cached_rel_size(rel, lsn, nblocks);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// DatadirModification represents an operation to ingest an atomic set of
|
||||
/// updates to the repository. It is created by the 'begin_record'
|
||||
/// function. It is called for each WAL record, so that all the modifications
|
||||
@@ -1279,8 +1055,6 @@ pub struct DatadirModification<'a> {
|
||||
pending_deletions: Vec<(Range<Key>, Lsn)>,
|
||||
pending_nblocks: i64,
|
||||
|
||||
metadata_state: MetadataWriteState<'a>,
|
||||
|
||||
/// For special "directory" keys that store key-value maps, track the size of the map
|
||||
/// if it was updated in this modification.
|
||||
pending_directory_entries: Vec<(DirectoryKind, usize)>,
|
||||
@@ -1307,26 +1081,6 @@ impl<'a> DatadirModification<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Apply a complex write op that may require read-modify-write to the underlying Timeline.
|
||||
async fn put_metadata_op(
|
||||
&mut self,
|
||||
lsn: Lsn,
|
||||
meta: MetadataOp,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Placeholder: just run inline.
|
||||
// TODO: make this sync, and defer all these to commit(), so that we don't have to carry a Timeline all the time.
|
||||
meta.apply(lsn, self, ctx).await
|
||||
}
|
||||
|
||||
/// While applying a metadata op, write a materialized page.
|
||||
fn put_metadata_page(&mut self, lsn: Lsn, key: Key, value: Bytes) {
|
||||
eprintln!("put_metadata_page {key} @ {lsn}");
|
||||
self.put_at_lsn(lsn, key, Value::Image(value.clone()));
|
||||
|
||||
self.metadata_state.put(lsn, key, value);
|
||||
}
|
||||
|
||||
/// Initialize a completely new repository.
|
||||
///
|
||||
/// This inserts the directory metadata entries that are assumed to
|
||||
@@ -1338,6 +1092,9 @@ impl<'a> DatadirModification<'a> {
|
||||
self.pending_directory_entries.push((DirectoryKind::Db, 0));
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
|
||||
// Create AuxFilesDirectory
|
||||
self.init_aux_dir()?;
|
||||
|
||||
let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
|
||||
xids: HashSet::new(),
|
||||
})?;
|
||||
@@ -1439,12 +1196,33 @@ impl<'a> DatadirModification<'a> {
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
self.put_metadata_op(
|
||||
self.lsn,
|
||||
MetadataOp::UpsertRelDirectory { spcnode, dbnode },
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
// Add it to the directory (if it doesn't exist already)
|
||||
let buf = self.get(DBDIR_KEY, ctx).await?;
|
||||
let mut dbdir = DbDirectory::des(&buf)?;
|
||||
|
||||
let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
|
||||
if r.is_none() || r == Some(false) {
|
||||
// The dbdir entry didn't exist, or it contained a
|
||||
// 'false'. The 'insert' call already updated it with
|
||||
// 'true', now write the updated 'dbdirs' map back.
|
||||
let buf = DbDirectory::ser(&dbdir)?;
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
|
||||
// Create AuxFilesDirectory as well
|
||||
self.init_aux_dir()?;
|
||||
}
|
||||
if r.is_none() {
|
||||
// Create RelDirectory
|
||||
let buf = RelDirectory::ser(&RelDirectory {
|
||||
rels: HashSet::new(),
|
||||
})?;
|
||||
self.pending_directory_entries.push((DirectoryKind::Rel, 0));
|
||||
self.put(
|
||||
rel_dir_to_key(spcnode, dbnode),
|
||||
Value::Image(Bytes::from(buf)),
|
||||
);
|
||||
}
|
||||
|
||||
self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
|
||||
Ok(())
|
||||
}
|
||||
@@ -1538,15 +1316,56 @@ impl<'a> DatadirModification<'a> {
|
||||
rel: RelTag,
|
||||
nblocks: BlockNumber,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// TODO: here, or earlier, validate that rel.relnode != 0 -- perhaps on construction of the RelTag?
|
||||
) -> Result<(), RelationError> {
|
||||
if rel.relnode == 0 {
|
||||
return Err(RelationError::InvalidRelnode);
|
||||
}
|
||||
// It's possible that this is the first rel for this db in this
|
||||
// tablespace. Create the reldir entry for it if so.
|
||||
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
|
||||
.context("deserialize db")?;
|
||||
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
|
||||
let mut rel_dir =
|
||||
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
|
||||
// Didn't exist. Update dbdir
|
||||
e.insert(false);
|
||||
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
|
||||
self.put_metadata_op(
|
||||
self.lsn,
|
||||
MetadataOp::UpsertRelDirectory2 { rel, nblocks },
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
// and create the RelDirectory
|
||||
RelDirectory::default()
|
||||
} else {
|
||||
// reldir already exists, fetch it
|
||||
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
|
||||
.context("deserialize db")?
|
||||
};
|
||||
|
||||
// Add the new relation to the rel directory entry, and write it back
|
||||
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
|
||||
return Err(RelationError::AlreadyExists);
|
||||
}
|
||||
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, rel_dir.rels.len()));
|
||||
|
||||
self.put(
|
||||
rel_dir_key,
|
||||
Value::Image(Bytes::from(
|
||||
RelDirectory::ser(&rel_dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
|
||||
// Put size
|
||||
let size_key = rel_size_to_key(rel);
|
||||
let buf = nblocks.to_le_bytes();
|
||||
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
|
||||
|
||||
self.pending_nblocks += nblocks as i64;
|
||||
|
||||
// Update relation size cache
|
||||
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
|
||||
|
||||
// Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
|
||||
// caller.
|
||||
@@ -1751,6 +1570,19 @@ impl<'a> DatadirModification<'a> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
|
||||
if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
|
||||
return Ok(());
|
||||
}
|
||||
let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
|
||||
files: HashMap::new(),
|
||||
})?;
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::AuxFiles, 0));
|
||||
self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn put_file(
|
||||
&mut self,
|
||||
path: &str,
|
||||
@@ -2045,7 +1877,6 @@ impl<'a> DatadirModification<'a> {
|
||||
|
||||
// Internal helper functions to batch the modifications
|
||||
|
||||
// TODO: retire this once all metadata writes are going via MetadataWriteState
|
||||
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
|
||||
// Have we already updated the same key? Read the latest pending updated
|
||||
// version in that case.
|
||||
@@ -2079,19 +1910,15 @@ impl<'a> DatadirModification<'a> {
|
||||
}
|
||||
|
||||
fn put(&mut self, key: Key, val: Value) {
|
||||
self.put_at_lsn(self.lsn, key, val)
|
||||
}
|
||||
|
||||
fn put_at_lsn(&mut self, lsn: Lsn, key: Key, val: Value) {
|
||||
let values = self.pending_updates.entry(key).or_default();
|
||||
// Replace the previous value if it exists at the same lsn
|
||||
if let Some((last_lsn, last_value)) = values.last_mut() {
|
||||
if *last_lsn == lsn {
|
||||
if *last_lsn == self.lsn {
|
||||
*last_value = val;
|
||||
return;
|
||||
}
|
||||
}
|
||||
values.push((lsn, val));
|
||||
values.push((self.lsn, val));
|
||||
}
|
||||
|
||||
fn delete(&mut self, key_range: Range<Key>) {
|
||||
|
||||
@@ -148,6 +148,7 @@ pub(crate) mod timeline;
|
||||
|
||||
pub mod size;
|
||||
|
||||
mod gc_block;
|
||||
pub(crate) mod throttle;
|
||||
|
||||
pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
@@ -303,6 +304,12 @@ pub struct Tenant {
|
||||
/// An ongoing timeline detach must be checked during attempts to GC or compact a timeline.
|
||||
ongoing_timeline_detach: std::sync::Mutex<Option<(TimelineId, utils::completion::Barrier)>>,
|
||||
|
||||
/// `index_part.json` based gc blocking reason tracking.
|
||||
///
|
||||
/// New gc iterations must start a new iteration by acquiring `GcBlock::start` before
|
||||
/// proceeding.
|
||||
pub(crate) gc_block: gc_block::GcBlock,
|
||||
|
||||
l0_flush_global_state: L0FlushGlobalState,
|
||||
}
|
||||
|
||||
@@ -1036,6 +1043,8 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
let mut gc_blocks = HashMap::new();
|
||||
|
||||
// For every timeline, download the metadata file, scan the local directory,
|
||||
// and build a layer map that contains an entry for each remote and local
|
||||
// layer file.
|
||||
@@ -1045,6 +1054,16 @@ impl Tenant {
|
||||
.remove(&timeline_id)
|
||||
.expect("just put it in above");
|
||||
|
||||
if let Some(blocking) = index_part.gc_blocking.as_ref() {
|
||||
// could just filter these away, but it helps while testing
|
||||
anyhow::ensure!(
|
||||
!blocking.reasons.is_empty(),
|
||||
"index_part for {timeline_id} is malformed: it should not have gc blocking with zero reasons"
|
||||
);
|
||||
let prev = gc_blocks.insert(timeline_id, blocking.reasons);
|
||||
assert!(prev.is_none());
|
||||
}
|
||||
|
||||
// TODO again handle early failure
|
||||
self.load_remote_timeline(
|
||||
timeline_id,
|
||||
@@ -1089,6 +1108,8 @@ impl Tenant {
|
||||
// IndexPart is the source of truth.
|
||||
self.clean_up_timelines(&existent_timelines)?;
|
||||
|
||||
self.gc_block.set_scanned(gc_blocks);
|
||||
|
||||
fail::fail_point!("attach-before-activate", |_| {
|
||||
anyhow::bail!("attach-before-activate");
|
||||
});
|
||||
@@ -1679,6 +1700,14 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
let _guard = match self.gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(reasons) => {
|
||||
info!("Skipping GC: {reasons}");
|
||||
return Ok(GcResult::default());
|
||||
}
|
||||
};
|
||||
|
||||
self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx)
|
||||
.await
|
||||
}
|
||||
@@ -2691,6 +2720,7 @@ impl Tenant {
|
||||
)),
|
||||
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
|
||||
ongoing_timeline_detach: std::sync::Mutex::default(),
|
||||
gc_block: Default::default(),
|
||||
l0_flush_global_state,
|
||||
}
|
||||
}
|
||||
@@ -4092,7 +4122,7 @@ pub(crate) mod harness {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
|
||||
use super::*;
|
||||
use crate::keyspace::KeySpaceAccum;
|
||||
@@ -4767,7 +4797,7 @@ mod tests {
|
||||
lsn: Lsn,
|
||||
repeat: usize,
|
||||
key_count: usize,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> anyhow::Result<HashMap<Key, BTreeSet<Lsn>>> {
|
||||
let compact = true;
|
||||
bulk_insert_maybe_compact_gc(tenant, timeline, ctx, lsn, repeat, key_count, compact).await
|
||||
}
|
||||
@@ -4780,7 +4810,9 @@ mod tests {
|
||||
repeat: usize,
|
||||
key_count: usize,
|
||||
compact: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> anyhow::Result<HashMap<Key, BTreeSet<Lsn>>> {
|
||||
let mut inserted: HashMap<Key, BTreeSet<Lsn>> = Default::default();
|
||||
|
||||
let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
|
||||
let mut blknum = 0;
|
||||
|
||||
@@ -4801,6 +4833,7 @@ mod tests {
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
inserted.entry(test_key).or_default().insert(lsn);
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
|
||||
@@ -4825,7 +4858,7 @@ mod tests {
|
||||
assert_eq!(res.layers_removed, 0, "this never removes anything");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(inserted)
|
||||
}
|
||||
|
||||
//
|
||||
@@ -4872,7 +4905,7 @@ mod tests {
|
||||
.await?;
|
||||
|
||||
let lsn = Lsn(0x10);
|
||||
bulk_insert_compact_gc(&tenant, &tline, &ctx, lsn, 50, 10000).await?;
|
||||
let inserted = bulk_insert_compact_gc(&tenant, &tline, &ctx, lsn, 50, 10000).await?;
|
||||
|
||||
let guard = tline.layers.read().await;
|
||||
guard.layer_map().dump(true, &ctx).await?;
|
||||
@@ -4933,9 +4966,39 @@ mod tests {
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
tline
|
||||
.validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx)
|
||||
.await;
|
||||
|
||||
let mut expected_lsns: HashMap<Key, Lsn> = Default::default();
|
||||
let mut expect_missing = false;
|
||||
let mut key = read.start().unwrap();
|
||||
while key != read.end().unwrap() {
|
||||
if let Some(lsns) = inserted.get(&key) {
|
||||
let expected_lsn = lsns.iter().rfind(|lsn| **lsn <= reads_lsn);
|
||||
match expected_lsn {
|
||||
Some(lsn) => {
|
||||
expected_lsns.insert(key, *lsn);
|
||||
}
|
||||
None => {
|
||||
expect_missing = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
expect_missing = true;
|
||||
break;
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
|
||||
if expect_missing {
|
||||
assert!(matches!(vectored_res, Err(GetVectoredError::MissingKey(_))));
|
||||
} else {
|
||||
for (key, image) in vectored_res? {
|
||||
let expected_lsn = expected_lsns.get(&key).expect("determined above");
|
||||
let expected_image = test_img(&format!("{} at {}", key.field6, expected_lsn));
|
||||
assert_eq!(image?, expected_image);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -4985,10 +5048,6 @@ mod tests {
|
||||
)
|
||||
.await;
|
||||
|
||||
child_timeline
|
||||
.validate_get_vectored_impl(&vectored_res, aux_keyspace, read_lsn, &ctx)
|
||||
.await;
|
||||
|
||||
let images = vectored_res?;
|
||||
assert!(images.is_empty());
|
||||
Ok(())
|
||||
@@ -6899,7 +6958,10 @@ mod tests {
|
||||
}
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
|
||||
tline
|
||||
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
for (idx, expected) in expected_result.iter().enumerate() {
|
||||
assert_eq!(
|
||||
@@ -6993,7 +7055,10 @@ mod tests {
|
||||
guard.cutoffs.time = Lsn(0x40);
|
||||
guard.cutoffs.space = Lsn(0x40);
|
||||
}
|
||||
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
|
||||
tline
|
||||
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -7327,7 +7392,10 @@ mod tests {
|
||||
}
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
|
||||
tline
|
||||
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
for idx in 0..10 {
|
||||
assert_eq!(
|
||||
@@ -7353,7 +7421,10 @@ mod tests {
|
||||
guard.cutoffs.time = Lsn(0x40);
|
||||
guard.cutoffs.space = Lsn(0x40);
|
||||
}
|
||||
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
|
||||
tline
|
||||
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -7898,11 +7969,28 @@ mod tests {
|
||||
verify_result().await;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
|
||||
let mut dryrun_flags = EnumSet::new();
|
||||
dryrun_flags.insert(CompactFlags::DryRun);
|
||||
|
||||
tline
|
||||
.compact_with_gc(&cancel, dryrun_flags, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
// We expect layer map to be the same b/c the dry run flag, but we don't know whether there will be other background jobs
|
||||
// cleaning things up, and therefore, we don't do sanity checks on the layer map during unit tests.
|
||||
verify_result().await;
|
||||
|
||||
tline
|
||||
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
verify_result().await;
|
||||
|
||||
// compact again
|
||||
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
|
||||
tline
|
||||
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
verify_result().await;
|
||||
|
||||
// increase GC horizon and compact again
|
||||
@@ -7912,11 +8000,17 @@ mod tests {
|
||||
guard.cutoffs.time = Lsn(0x38);
|
||||
guard.cutoffs.space = Lsn(0x38);
|
||||
}
|
||||
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
|
||||
tline
|
||||
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
verify_result().await; // no wals between 0x30 and 0x38, so we should obtain the same result
|
||||
|
||||
// not increasing the GC horizon and compact again
|
||||
tline.compact_with_gc(&cancel, &ctx).await.unwrap();
|
||||
tline
|
||||
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
verify_result().await;
|
||||
|
||||
Ok(())
|
||||
@@ -8097,7 +8191,10 @@ mod tests {
|
||||
verify_result().await;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
branch_tline.compact_with_gc(&cancel, &ctx).await.unwrap();
|
||||
branch_tline
|
||||
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
verify_result().await;
|
||||
|
||||
|
||||
213
pageserver/src/tenant/gc_block.rs
Normal file
213
pageserver/src/tenant/gc_block.rs
Normal file
@@ -0,0 +1,213 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use super::remote_timeline_client::index::GcBlockingReason;
|
||||
|
||||
type Storage = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct GcBlock {
|
||||
/// The timelines which have current reasons to block gc.
|
||||
///
|
||||
/// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done
|
||||
/// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`.
|
||||
reasons: std::sync::Mutex<Storage>,
|
||||
blocking: tokio::sync::Mutex<()>,
|
||||
}
|
||||
|
||||
impl GcBlock {
|
||||
/// Start another gc iteration.
|
||||
///
|
||||
/// Returns a guard to be held for the duration of gc iteration to allow synchronizing with
|
||||
/// it's ending, or if not currently possible, a value describing the reasons why not.
|
||||
///
|
||||
/// Cancellation safe.
|
||||
pub(super) async fn start(&self) -> Result<Guard<'_>, BlockingReasons> {
|
||||
let reasons = {
|
||||
let g = self.reasons.lock().unwrap();
|
||||
|
||||
// TODO: the assumption is that this method gets called periodically. in prod, we use 1h, in
|
||||
// tests, we use everything. we should warn if the gc has been consecutively blocked
|
||||
// for more than 1h (within single tenant session?).
|
||||
BlockingReasons::clean_and_summarize(g)
|
||||
};
|
||||
|
||||
if let Some(reasons) = reasons {
|
||||
Err(reasons)
|
||||
} else {
|
||||
Ok(Guard {
|
||||
_inner: self.blocking.lock().await,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn summary(&self) -> Option<BlockingReasons> {
|
||||
let g = self.reasons.lock().unwrap();
|
||||
|
||||
BlockingReasons::summarize(&g)
|
||||
}
|
||||
|
||||
/// Start blocking gc for this one timeline for the given reason.
|
||||
///
|
||||
/// This is not a guard based API but instead it mimics set API. The returned future will not
|
||||
/// resolve until an existing gc round has completed.
|
||||
///
|
||||
/// Returns true if this block was new, false if gc was already blocked for this reason.
|
||||
///
|
||||
/// Cancellation safe: cancelling after first poll will keep the reason to block gc, but will
|
||||
/// keep the gc blocking reason.
|
||||
pub(crate) async fn insert(
|
||||
&self,
|
||||
timeline: &super::Timeline,
|
||||
reason: GcBlockingReason,
|
||||
) -> anyhow::Result<bool> {
|
||||
let (added, uploaded) = {
|
||||
let mut g = self.reasons.lock().unwrap();
|
||||
let set = g.entry(timeline.timeline_id).or_default();
|
||||
let added = set.insert(reason);
|
||||
|
||||
// LOCK ORDER: intentionally hold the lock, see self.reasons.
|
||||
let uploaded = timeline
|
||||
.remote_client
|
||||
.schedule_insert_gc_block_reason(reason)?;
|
||||
|
||||
(added, uploaded)
|
||||
};
|
||||
|
||||
uploaded.await?;
|
||||
|
||||
// ensure that any ongoing gc iteration has completed
|
||||
drop(self.blocking.lock().await);
|
||||
|
||||
Ok(added)
|
||||
}
|
||||
|
||||
/// Remove blocking gc for this one timeline and the given reason.
|
||||
pub(crate) async fn remove(
|
||||
&self,
|
||||
timeline: &super::Timeline,
|
||||
reason: GcBlockingReason,
|
||||
) -> anyhow::Result<()> {
|
||||
use std::collections::hash_map::Entry;
|
||||
|
||||
super::span::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let (remaining_blocks, uploaded) = {
|
||||
let mut g = self.reasons.lock().unwrap();
|
||||
match g.entry(timeline.timeline_id) {
|
||||
Entry::Occupied(mut oe) => {
|
||||
let set = oe.get_mut();
|
||||
set.remove(reason);
|
||||
if set.is_empty() {
|
||||
oe.remove();
|
||||
}
|
||||
}
|
||||
Entry::Vacant(_) => {
|
||||
// we must still do the index_part.json update regardless, in case we had earlier
|
||||
// been cancelled
|
||||
}
|
||||
}
|
||||
|
||||
let remaining_blocks = g.len();
|
||||
|
||||
// LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons
|
||||
let uploaded = timeline
|
||||
.remote_client
|
||||
.schedule_remove_gc_block_reason(reason)?;
|
||||
|
||||
(remaining_blocks, uploaded)
|
||||
};
|
||||
uploaded.await?;
|
||||
|
||||
// no need to synchronize with gc iteration again
|
||||
|
||||
if remaining_blocks > 0 {
|
||||
tracing::info!(remaining_blocks, removed=?reason, "gc blocking removed, but gc remains blocked");
|
||||
} else {
|
||||
tracing::info!("gc is now unblocked for the tenant");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn before_delete(&self, timeline: &super::Timeline) {
|
||||
let unblocked = {
|
||||
let mut g = self.reasons.lock().unwrap();
|
||||
if g.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
g.remove(&timeline.timeline_id);
|
||||
|
||||
BlockingReasons::clean_and_summarize(g).is_none()
|
||||
};
|
||||
|
||||
if unblocked {
|
||||
tracing::info!("gc is now unblocked following deletion");
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize with the non-deleted timelines of this tenant.
|
||||
pub(crate) fn set_scanned(&self, scanned: Storage) {
|
||||
let mut g = self.reasons.lock().unwrap();
|
||||
assert!(g.is_empty());
|
||||
g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
|
||||
|
||||
if let Some(reasons) = BlockingReasons::clean_and_summarize(g) {
|
||||
tracing::info!(summary=?reasons, "initialized with gc blocked");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) struct Guard<'a> {
|
||||
_inner: tokio::sync::MutexGuard<'a, ()>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct BlockingReasons {
|
||||
timelines: usize,
|
||||
reasons: enumset::EnumSet<GcBlockingReason>,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for BlockingReasons {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{} timelines block for {:?}",
|
||||
self.timelines, self.reasons
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockingReasons {
|
||||
fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
|
||||
let mut reasons = enumset::EnumSet::empty();
|
||||
g.retain(|_key, value| {
|
||||
reasons = reasons.union(*value);
|
||||
!value.is_empty()
|
||||
});
|
||||
if !g.is_empty() {
|
||||
Some(BlockingReasons {
|
||||
timelines: g.len(),
|
||||
reasons,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
|
||||
if g.is_empty() {
|
||||
None
|
||||
} else {
|
||||
let reasons = g
|
||||
.values()
|
||||
.fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next));
|
||||
Some(BlockingReasons {
|
||||
timelines: g.len(),
|
||||
reasons,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -800,6 +800,123 @@ impl RemoteTimelineClient {
|
||||
.context("wait completion")
|
||||
}
|
||||
|
||||
/// Adds a gc blocking reason for this timeline if one does not exist already.
|
||||
///
|
||||
/// A retryable step of timeline detach ancestor.
|
||||
///
|
||||
/// Returns a future which waits until the completion of the upload.
|
||||
pub(crate) fn schedule_insert_gc_block_reason(
|
||||
self: &Arc<Self>,
|
||||
reason: index::GcBlockingReason,
|
||||
) -> Result<impl std::future::Future<Output = Result<(), WaitCompletionError>>, NotInitialized>
|
||||
{
|
||||
let maybe_barrier = {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
if let index::GcBlockingReason::DetachAncestor = reason {
|
||||
if upload_queue.dirty.metadata.ancestor_timeline().is_none() {
|
||||
drop(guard);
|
||||
panic!("cannot start detach ancestor if there is nothing to detach from");
|
||||
}
|
||||
}
|
||||
|
||||
let wanted = |x: Option<&index::GcBlocking>| x.is_some_and(|x| x.blocked_by(reason));
|
||||
|
||||
let current = upload_queue.dirty.gc_blocking.as_ref();
|
||||
let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
|
||||
|
||||
match (current, uploaded) {
|
||||
(x, y) if wanted(x) && wanted(y) => None,
|
||||
(x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
|
||||
// Usual case: !wanted(x) && !wanted(y)
|
||||
//
|
||||
// Unusual: !wanted(x) && wanted(y) which means we have two processes waiting to
|
||||
// turn on and off some reason.
|
||||
(x, y) => {
|
||||
if !wanted(x) && wanted(y) {
|
||||
// this could be avoided by having external in-memory synchronization, like
|
||||
// timeline detach ancestor
|
||||
warn!(?reason, op="insert", "unexpected: two racing processes to enable and disable a gc blocking reason");
|
||||
}
|
||||
|
||||
// at this point, the metadata must always show that there is a parent
|
||||
upload_queue.dirty.gc_blocking = current
|
||||
.map(|x| x.with_reason(reason))
|
||||
.or_else(|| Some(index::GcBlocking::started_now_for(reason)));
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(async move {
|
||||
if let Some(barrier) = maybe_barrier {
|
||||
Self::wait_completion0(barrier).await?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Removes a gc blocking reason for this timeline if one exists.
|
||||
///
|
||||
/// A retryable step of timeline detach ancestor.
|
||||
///
|
||||
/// Returns a future which waits until the completion of the upload.
|
||||
pub(crate) fn schedule_remove_gc_block_reason(
|
||||
self: &Arc<Self>,
|
||||
reason: index::GcBlockingReason,
|
||||
) -> Result<impl std::future::Future<Output = Result<(), WaitCompletionError>>, NotInitialized>
|
||||
{
|
||||
let maybe_barrier = {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
if let index::GcBlockingReason::DetachAncestor = reason {
|
||||
if !upload_queue
|
||||
.clean
|
||||
.0
|
||||
.lineage
|
||||
.is_detached_from_original_ancestor()
|
||||
{
|
||||
drop(guard);
|
||||
panic!("cannot complete timeline_ancestor_detach while not detached");
|
||||
}
|
||||
}
|
||||
|
||||
let wanted = |x: Option<&index::GcBlocking>| {
|
||||
x.is_none() || x.is_some_and(|b| !b.blocked_by(reason))
|
||||
};
|
||||
|
||||
let current = upload_queue.dirty.gc_blocking.as_ref();
|
||||
let uploaded = upload_queue.clean.0.gc_blocking.as_ref();
|
||||
|
||||
match (current, uploaded) {
|
||||
(x, y) if wanted(x) && wanted(y) => None,
|
||||
(x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
|
||||
(x, y) => {
|
||||
if !wanted(x) && wanted(y) {
|
||||
warn!(?reason, op="remove", "unexpected: two racing processes to enable and disable a gc blocking reason (remove)");
|
||||
}
|
||||
|
||||
upload_queue.dirty.gc_blocking =
|
||||
current.as_ref().and_then(|x| x.without_reason(reason));
|
||||
assert!(wanted(upload_queue.dirty.gc_blocking.as_ref()));
|
||||
// FIXME: bogus ?
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(async move {
|
||||
if let Some(barrier) = maybe_barrier {
|
||||
Self::wait_completion0(barrier).await?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Launch an upload operation in the background; the file is added to be included in next
|
||||
/// `index_part.json` upload.
|
||||
pub(crate) fn schedule_layer_file_upload(
|
||||
|
||||
@@ -60,6 +60,9 @@ pub struct IndexPart {
|
||||
#[serde(default)]
|
||||
pub(crate) lineage: Lineage,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub(crate) gc_blocking: Option<GcBlocking>,
|
||||
|
||||
/// Describes the kind of aux files stored in the timeline.
|
||||
///
|
||||
/// The value is modified during file ingestion when the latest wanted value communicated via tenant config is applied if it is acceptable.
|
||||
@@ -85,10 +88,11 @@ impl IndexPart {
|
||||
/// - 6: last_aux_file_policy is added.
|
||||
/// - 7: metadata_bytes is no longer written, but still read
|
||||
/// - 8: added `archived_at`
|
||||
const LATEST_VERSION: usize = 8;
|
||||
/// - 9: +gc_blocking
|
||||
const LATEST_VERSION: usize = 9;
|
||||
|
||||
// Versions we may see when reading from a bucket.
|
||||
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8];
|
||||
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9];
|
||||
|
||||
pub const FILE_NAME: &'static str = "index_part.json";
|
||||
|
||||
@@ -101,6 +105,7 @@ impl IndexPart {
|
||||
deleted_at: None,
|
||||
archived_at: None,
|
||||
lineage: Default::default(),
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: None,
|
||||
}
|
||||
}
|
||||
@@ -251,6 +256,64 @@ impl Lineage {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub(crate) struct GcBlocking {
|
||||
pub(crate) started_at: NaiveDateTime,
|
||||
pub(crate) reasons: enumset::EnumSet<GcBlockingReason>,
|
||||
}
|
||||
|
||||
#[derive(Debug, enumset::EnumSetType, serde::Serialize, serde::Deserialize)]
|
||||
#[enumset(serialize_repr = "list")]
|
||||
pub(crate) enum GcBlockingReason {
|
||||
Manual,
|
||||
DetachAncestor,
|
||||
}
|
||||
|
||||
impl GcBlocking {
|
||||
pub(super) fn started_now_for(reason: GcBlockingReason) -> Self {
|
||||
GcBlocking {
|
||||
started_at: chrono::Utc::now().naive_utc(),
|
||||
reasons: enumset::EnumSet::only(reason),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the given reason is one of the reasons why the gc is blocked.
|
||||
pub(crate) fn blocked_by(&self, reason: GcBlockingReason) -> bool {
|
||||
self.reasons.contains(reason)
|
||||
}
|
||||
|
||||
/// Returns a version of self with the given reason.
|
||||
pub(super) fn with_reason(&self, reason: GcBlockingReason) -> Self {
|
||||
assert!(!self.blocked_by(reason));
|
||||
let mut reasons = self.reasons;
|
||||
reasons.insert(reason);
|
||||
|
||||
Self {
|
||||
started_at: self.started_at,
|
||||
reasons,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a version of self without the given reason. Assumption is that if
|
||||
/// there are no more reasons, we can unblock the gc by returning `None`.
|
||||
pub(super) fn without_reason(&self, reason: GcBlockingReason) -> Option<Self> {
|
||||
assert!(self.blocked_by(reason));
|
||||
|
||||
if self.reasons.len() == 1 {
|
||||
None
|
||||
} else {
|
||||
let mut reasons = self.reasons;
|
||||
assert!(reasons.remove(reason));
|
||||
assert!(!reasons.is_empty());
|
||||
|
||||
Some(Self {
|
||||
started_at: self.started_at,
|
||||
reasons,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -292,6 +355,7 @@ mod tests {
|
||||
deleted_at: None,
|
||||
archived_at: None,
|
||||
lineage: Lineage::default(),
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
|
||||
@@ -335,6 +399,7 @@ mod tests {
|
||||
deleted_at: None,
|
||||
archived_at: None,
|
||||
lineage: Lineage::default(),
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
|
||||
@@ -379,6 +444,7 @@ mod tests {
|
||||
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
archived_at: None,
|
||||
lineage: Lineage::default(),
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
|
||||
@@ -426,6 +492,7 @@ mod tests {
|
||||
deleted_at: None,
|
||||
archived_at: None,
|
||||
lineage: Lineage::default(),
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
|
||||
@@ -468,6 +535,7 @@ mod tests {
|
||||
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
archived_at: None,
|
||||
lineage: Lineage::default(),
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
|
||||
@@ -513,6 +581,7 @@ mod tests {
|
||||
reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()],
|
||||
original_ancestor: Some((TimelineId::from_str("e2bfd8c633d713d279e6fcd2bcc15b6d").unwrap(), Lsn::from_str("0/15A7618").unwrap(), parse_naive_datetime("2024-05-07T18:52:36.322426563"))),
|
||||
},
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: None,
|
||||
};
|
||||
|
||||
@@ -563,6 +632,7 @@ mod tests {
|
||||
reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()],
|
||||
original_ancestor: Some((TimelineId::from_str("e2bfd8c633d713d279e6fcd2bcc15b6d").unwrap(), Lsn::from_str("0/15A7618").unwrap(), parse_naive_datetime("2024-05-07T18:52:36.322426563"))),
|
||||
},
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: Some(AuxFilePolicy::V2),
|
||||
};
|
||||
|
||||
@@ -618,6 +688,7 @@ mod tests {
|
||||
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
archived_at: None,
|
||||
lineage: Default::default(),
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: Default::default(),
|
||||
};
|
||||
|
||||
@@ -674,6 +745,7 @@ mod tests {
|
||||
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
archived_at: Some(parse_naive_datetime("2023-04-29T09:00:00.123000000")),
|
||||
lineage: Default::default(),
|
||||
gc_blocking: None,
|
||||
last_aux_file_policy: Default::default(),
|
||||
};
|
||||
|
||||
@@ -681,6 +753,68 @@ mod tests {
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn v9_indexpart_is_parsed() {
|
||||
let example = r#"{
|
||||
"version": 9,
|
||||
"layer_metadata":{
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
|
||||
},
|
||||
"disk_consistent_lsn":"0/16960E8",
|
||||
"metadata": {
|
||||
"disk_consistent_lsn": "0/16960E8",
|
||||
"prev_record_lsn": "0/1696070",
|
||||
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
|
||||
"ancestor_lsn": "0/0",
|
||||
"latest_gc_cutoff_lsn": "0/1696070",
|
||||
"initdb_lsn": "0/1696070",
|
||||
"pg_version": 14
|
||||
},
|
||||
"gc_blocking": {
|
||||
"started_at": "2024-07-19T09:00:00.123",
|
||||
"reasons": ["DetachAncestor"]
|
||||
}
|
||||
}"#;
|
||||
|
||||
let expected = IndexPart {
|
||||
version: 9,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 25600000,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
|
||||
file_size: 9007199254741001,
|
||||
generation: Generation::none(),
|
||||
shard: ShardIndex::unsharded()
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::new(
|
||||
Lsn::from_str("0/16960E8").unwrap(),
|
||||
Some(Lsn::from_str("0/1696070").unwrap()),
|
||||
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
|
||||
Lsn::INVALID,
|
||||
Lsn::from_str("0/1696070").unwrap(),
|
||||
Lsn::from_str("0/1696070").unwrap(),
|
||||
14,
|
||||
).with_recalculated_checksum().unwrap(),
|
||||
deleted_at: None,
|
||||
lineage: Default::default(),
|
||||
gc_blocking: Some(GcBlocking {
|
||||
started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"),
|
||||
reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]),
|
||||
}),
|
||||
last_aux_file_policy: Default::default(),
|
||||
archived_at: None,
|
||||
};
|
||||
|
||||
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
fn parse_naive_datetime(s: &str) -> NaiveDateTime {
|
||||
chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S.%f").unwrap()
|
||||
}
|
||||
|
||||
@@ -8,6 +8,9 @@ mod layer_desc;
|
||||
mod layer_name;
|
||||
pub mod merge_iterator;
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod split_writer;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext};
|
||||
use crate::repository::Value;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
@@ -432,21 +435,6 @@ impl ReadableLayer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Return value from [`Layer::get_value_reconstruct_data`]
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum ValueReconstructResult {
|
||||
/// Got all the data needed to reconstruct the requested page
|
||||
Complete,
|
||||
/// This layer didn't contain all the required data, the caller should look up
|
||||
/// the predecessor layer at the returned LSN and collect more data from there.
|
||||
Continue,
|
||||
|
||||
/// This layer didn't contain data needed to reconstruct the page version at
|
||||
/// the returned LSN. This is usually considered an error, but might be OK
|
||||
/// in some circumstances.
|
||||
Missing,
|
||||
}
|
||||
|
||||
/// Layers contain a hint indicating whether they are likely to be used for reads. This is a hint rather
|
||||
/// than an authoritative value, so that we do not have to update it synchronously when changing the visibility
|
||||
/// of layers (for example when creating a branch that makes some previously covered layers visible). It should
|
||||
|
||||
@@ -36,7 +36,7 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi
|
||||
use crate::tenant::disk_btree::{
|
||||
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
|
||||
};
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
@@ -384,6 +384,9 @@ struct DeltaLayerWriterInner {
|
||||
tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
|
||||
|
||||
blob_writer: BlobWriter<true>,
|
||||
|
||||
// Number of key-lsns in the layer.
|
||||
num_keys: usize,
|
||||
}
|
||||
|
||||
impl DeltaLayerWriterInner {
|
||||
@@ -425,6 +428,7 @@ impl DeltaLayerWriterInner {
|
||||
lsn_range,
|
||||
tree: tree_builder,
|
||||
blob_writer,
|
||||
num_keys: 0,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -475,6 +479,9 @@ impl DeltaLayerWriterInner {
|
||||
|
||||
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
|
||||
let res = self.tree.append(&delta_key.0, blob_ref.0);
|
||||
|
||||
self.num_keys += 1;
|
||||
|
||||
(val, res.map_err(|e| anyhow::anyhow!(e)))
|
||||
}
|
||||
|
||||
@@ -686,6 +693,17 @@ impl DeltaLayerWriter {
|
||||
.finish(key_end, timeline, ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn num_keys(&self) -> usize {
|
||||
self.inner.as_ref().unwrap().num_keys
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn estimated_size(&self) -> u64 {
|
||||
let inner = self.inner.as_ref().unwrap();
|
||||
inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DeltaLayerWriter {
|
||||
@@ -808,95 +826,6 @@ impl DeltaLayerInner {
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
let mut need_image = true;
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
self.index_root_blk,
|
||||
&block_reader,
|
||||
);
|
||||
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
|
||||
|
||||
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
|
||||
|
||||
tree_reader
|
||||
.visit(
|
||||
&search_key.0,
|
||||
VisitDirection::Backwards,
|
||||
|key, value| {
|
||||
let blob_ref = BlobRef(value);
|
||||
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
|
||||
return false;
|
||||
}
|
||||
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
|
||||
if entry_lsn < lsn_range.start {
|
||||
return false;
|
||||
}
|
||||
offsets.push((entry_lsn, blob_ref.pos()));
|
||||
|
||||
!blob_ref.will_init()
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let ctx = &RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerValue)
|
||||
.build();
|
||||
|
||||
// Ok, 'offsets' now contains the offsets of all the entries we need to read
|
||||
let cursor = block_reader.block_cursor();
|
||||
let mut buf = Vec::new();
|
||||
for (entry_lsn, pos) in offsets {
|
||||
cursor
|
||||
.read_blob_into_buf(pos, &mut buf, ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to read blob from virtual file {}", self.file.path)
|
||||
})?;
|
||||
let val = Value::des(&buf).with_context(|| {
|
||||
format!(
|
||||
"Failed to deserialize file blob from virtual file {}",
|
||||
self.file.path
|
||||
)
|
||||
})?;
|
||||
match val {
|
||||
Value::Image(img) => {
|
||||
reconstruct_state.img = Some((entry_lsn, img));
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((entry_lsn, rec));
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know.
|
||||
if need_image {
|
||||
Ok(ValueReconstructResult::Continue)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
}
|
||||
}
|
||||
|
||||
// Look up the keys in the provided keyspace and update
|
||||
// the reconstruct state with whatever is found.
|
||||
//
|
||||
|
||||
@@ -32,9 +32,7 @@ use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{
|
||||
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
|
||||
};
|
||||
use crate::tenant::storage_layer::{
|
||||
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
use crate::tenant::storage_layer::LayerAccessStats;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
@@ -429,46 +427,6 @@ impl ImageLayerInner {
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
if let Some(offset) = tree_reader
|
||||
.get(
|
||||
&keybuf,
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
|
||||
.build(),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
let blob = block_reader
|
||||
.block_cursor()
|
||||
.read_blob(
|
||||
offset,
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerValue)
|
||||
.build(),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
||||
let value = Bytes::from(blob);
|
||||
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Missing)
|
||||
}
|
||||
}
|
||||
|
||||
// Look up the keys in the provided keyspace and update
|
||||
// the reconstruct state with whatever is found.
|
||||
pub(super) async fn get_values_reconstruct_data(
|
||||
@@ -742,11 +700,21 @@ struct ImageLayerWriterInner {
|
||||
// where we have chosen their compressed form
|
||||
uncompressed_bytes_chosen: u64,
|
||||
|
||||
// Number of keys in the layer.
|
||||
num_keys: usize,
|
||||
|
||||
blob_writer: BlobWriter<false>,
|
||||
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
|
||||
|
||||
#[cfg_attr(not(feature = "testing"), allow(dead_code))]
|
||||
last_written_key: Key,
|
||||
}
|
||||
|
||||
impl ImageLayerWriterInner {
|
||||
fn size(&self) -> u64 {
|
||||
self.tree.borrow_writer().size() + self.blob_writer.size()
|
||||
}
|
||||
|
||||
///
|
||||
/// Start building a new image layer.
|
||||
///
|
||||
@@ -800,6 +768,8 @@ impl ImageLayerWriterInner {
|
||||
uncompressed_bytes: 0,
|
||||
uncompressed_bytes_eligible: 0,
|
||||
uncompressed_bytes_chosen: 0,
|
||||
num_keys: 0,
|
||||
last_written_key: Key::MIN,
|
||||
};
|
||||
|
||||
Ok(writer)
|
||||
@@ -820,6 +790,7 @@ impl ImageLayerWriterInner {
|
||||
let compression = self.conf.image_compression;
|
||||
let uncompressed_len = img.len() as u64;
|
||||
self.uncompressed_bytes += uncompressed_len;
|
||||
self.num_keys += 1;
|
||||
let (_img, res) = self
|
||||
.blob_writer
|
||||
.write_blob_maybe_compressed(img, ctx, compression)
|
||||
@@ -839,6 +810,11 @@ impl ImageLayerWriterInner {
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree.append(&keybuf, off)?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
self.last_written_key = key;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -849,6 +825,7 @@ impl ImageLayerWriterInner {
|
||||
self,
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
end_key: Option<Key>,
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
let index_start_blk =
|
||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||
@@ -899,11 +876,23 @@ impl ImageLayerWriterInner {
|
||||
let desc = PersistentLayerDesc::new_img(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
self.key_range.clone(),
|
||||
if let Some(end_key) = end_key {
|
||||
self.key_range.start..end_key
|
||||
} else {
|
||||
self.key_range.clone()
|
||||
},
|
||||
self.lsn,
|
||||
metadata.len(),
|
||||
);
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
if let Some(end_key) = end_key {
|
||||
assert!(
|
||||
self.last_written_key < end_key,
|
||||
"written key violates end_key range"
|
||||
);
|
||||
}
|
||||
|
||||
// Note: Because we open the file in write-only mode, we cannot
|
||||
// reuse the same VirtualFile for reading later. That's why we don't
|
||||
// set inner.file here. The first read will have to re-open it.
|
||||
@@ -980,6 +969,18 @@ impl ImageLayerWriter {
|
||||
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
/// Estimated size of the image layer.
|
||||
pub(crate) fn estimated_size(&self) -> u64 {
|
||||
let inner = self.inner.as_ref().unwrap();
|
||||
inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn num_keys(&self) -> usize {
|
||||
self.inner.as_ref().unwrap().num_keys
|
||||
}
|
||||
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
@@ -988,7 +989,26 @@ impl ImageLayerWriter {
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<super::ResidentLayer> {
|
||||
self.inner.take().unwrap().finish(timeline, ctx).await
|
||||
self.inner.take().unwrap().finish(timeline, ctx, None).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
/// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
|
||||
pub(super) async fn finish_with_end_key(
|
||||
mut self,
|
||||
timeline: &Arc<Timeline>,
|
||||
end_key: Key,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<super::ResidentLayer> {
|
||||
self.inner
|
||||
.take()
|
||||
.unwrap()
|
||||
.finish(timeline, ctx, Some(end_key))
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) fn size(&self) -> u64 {
|
||||
self.inner.as_ref().unwrap().size()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,11 +10,10 @@ use crate::page_cache::PAGE_SZ;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::storage_layer::ValueReconstructResult;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::{PageReconstructError, Timeline};
|
||||
use crate::{l0_flush, page_cache, walrecord};
|
||||
use anyhow::{anyhow, ensure, Result};
|
||||
use anyhow::{anyhow, Result};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -33,10 +32,7 @@ use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize};
|
||||
use tokio::sync::{RwLock, RwLockWriteGuard};
|
||||
|
||||
use super::{
|
||||
DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
|
||||
ValuesReconstructState,
|
||||
};
|
||||
use super::{DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValuesReconstructState};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
|
||||
pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
|
||||
@@ -55,9 +51,6 @@ pub struct InMemoryLayer {
|
||||
/// Writes are only allowed when this is `None`.
|
||||
pub(crate) end_lsn: OnceLock<Lsn>,
|
||||
|
||||
/// Used for traversal path. Cached representation of the in-memory layer before frozen.
|
||||
local_path_str: Arc<str>,
|
||||
|
||||
/// Used for traversal path. Cached representation of the in-memory layer after frozen.
|
||||
frozen_local_path_str: OnceLock<Arc<str>>,
|
||||
|
||||
@@ -248,12 +241,6 @@ impl InMemoryLayer {
|
||||
self.start_lsn..self.end_lsn_or_max()
|
||||
}
|
||||
|
||||
pub(crate) fn local_path_str(&self) -> &Arc<str> {
|
||||
self.frozen_local_path_str
|
||||
.get()
|
||||
.unwrap_or(&self.local_path_str)
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
///
|
||||
/// this is likely completly unused
|
||||
@@ -303,60 +290,6 @@ impl InMemoryLayer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Look up given value in the layer.
|
||||
pub(crate) async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
ensure!(lsn_range.start >= self.start_lsn);
|
||||
let mut need_image = true;
|
||||
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build();
|
||||
|
||||
let inner = self.inner.read().await;
|
||||
|
||||
let reader = inner.file.block_cursor();
|
||||
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
if let Some(vec_map) = inner.index.get(&key) {
|
||||
let slice = vec_map.slice_range(lsn_range);
|
||||
for (entry_lsn, pos) in slice.iter().rev() {
|
||||
let buf = reader.read_blob(*pos, &ctx).await?;
|
||||
let value = Value::des(&buf)?;
|
||||
match value {
|
||||
Value::Image(img) => {
|
||||
reconstruct_state.img = Some((*entry_lsn, img));
|
||||
return Ok(ValueReconstructResult::Complete);
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((*entry_lsn, rec));
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// release lock on 'inner'
|
||||
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know.
|
||||
if need_image {
|
||||
Ok(ValueReconstructResult::Continue)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
}
|
||||
}
|
||||
|
||||
// Look up the keys in the provided keyspace and update
|
||||
// the reconstruct state with whatever is found.
|
||||
//
|
||||
@@ -458,11 +391,6 @@ impl InMemoryLayer {
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
file_id: key,
|
||||
local_path_str: {
|
||||
let mut buf = String::new();
|
||||
inmem_layer_log_display(&mut buf, timeline_id, start_lsn, Lsn::MAX).unwrap();
|
||||
buf.into()
|
||||
},
|
||||
frozen_local_path_str: OnceLock::new(),
|
||||
conf,
|
||||
timeline_id,
|
||||
|
||||
@@ -24,8 +24,7 @@ use super::delta_layer::{self, DeltaEntry};
|
||||
use super::image_layer::{self};
|
||||
use super::{
|
||||
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
|
||||
LayerVisibilityHint, PersistentLayerDesc, ValueReconstructResult, ValueReconstructState,
|
||||
ValuesReconstructState,
|
||||
LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState,
|
||||
};
|
||||
|
||||
use utils::generation::Generation;
|
||||
@@ -301,42 +300,6 @@ impl Layer {
|
||||
self.0.delete_on_drop();
|
||||
}
|
||||
|
||||
/// Return data needed to reconstruct given page at LSN.
|
||||
///
|
||||
/// It is up to the caller to collect more data from the previous layer and
|
||||
/// perform WAL redo, if necessary.
|
||||
///
|
||||
/// # Cancellation-Safety
|
||||
///
|
||||
/// This method is cancellation-safe.
|
||||
pub(crate) async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_data: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
use anyhow::ensure;
|
||||
|
||||
let layer = self.0.get_or_maybe_download(true, Some(ctx)).await?;
|
||||
self.0.access_stats.record_access(ctx);
|
||||
|
||||
if self.layer_desc().is_delta {
|
||||
ensure!(lsn_range.start >= self.layer_desc().lsn_range.start);
|
||||
ensure!(self.layer_desc().key_range.contains(&key));
|
||||
} else {
|
||||
ensure!(self.layer_desc().key_range.contains(&key));
|
||||
ensure!(lsn_range.start >= self.layer_desc().image_layer_lsn());
|
||||
ensure!(lsn_range.end >= self.layer_desc().image_layer_lsn());
|
||||
}
|
||||
|
||||
layer
|
||||
.get_value_reconstruct_data(key, lsn_range, reconstruct_data, &self.0, ctx)
|
||||
.instrument(tracing::debug_span!("get_value_reconstruct_data", layer=%self))
|
||||
.await
|
||||
.with_context(|| format!("get_value_reconstruct_data for layer {self}"))
|
||||
}
|
||||
|
||||
pub(crate) async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
@@ -441,10 +404,6 @@ impl Layer {
|
||||
&self.0.path
|
||||
}
|
||||
|
||||
pub(crate) fn debug_str(&self) -> &Arc<str> {
|
||||
&self.0.debug_str
|
||||
}
|
||||
|
||||
pub(crate) fn metadata(&self) -> LayerFileMetadata {
|
||||
self.0.metadata()
|
||||
}
|
||||
@@ -519,7 +478,7 @@ impl Layer {
|
||||
///
|
||||
/// However when we want something evicted, we cannot evict it right away as there might be current
|
||||
/// reads happening on it. For example: it has been searched from [`LayerMap::search`] but not yet
|
||||
/// read with [`Layer::get_value_reconstruct_data`].
|
||||
/// read with [`Layer::get_values_reconstruct_data`].
|
||||
///
|
||||
/// [`LayerMap::search`]: crate::tenant::layer_map::LayerMap::search
|
||||
#[derive(Debug)]
|
||||
@@ -600,9 +559,6 @@ struct LayerInner {
|
||||
/// Full path to the file; unclear if this should exist anymore.
|
||||
path: Utf8PathBuf,
|
||||
|
||||
/// String representation of the layer, used for traversal id.
|
||||
debug_str: Arc<str>,
|
||||
|
||||
desc: PersistentLayerDesc,
|
||||
|
||||
/// Timeline access is needed for remote timeline client and metrics.
|
||||
@@ -836,9 +792,6 @@ impl LayerInner {
|
||||
|
||||
LayerInner {
|
||||
conf,
|
||||
debug_str: {
|
||||
format!("timelines/{}/{}", timeline.timeline_id, desc.layer_name()).into()
|
||||
},
|
||||
path: local_path,
|
||||
desc,
|
||||
timeline: Arc::downgrade(timeline),
|
||||
@@ -1759,28 +1712,6 @@ impl DownloadedLayer {
|
||||
.map_err(|e| anyhow::anyhow!("layer load failed earlier: {e}"))
|
||||
}
|
||||
|
||||
async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_data: &mut ValueReconstructState,
|
||||
owner: &Arc<LayerInner>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
use LayerKind::*;
|
||||
|
||||
match self.get(owner, ctx).await? {
|
||||
Delta(d) => {
|
||||
d.get_value_reconstruct_data(key, lsn_range, reconstruct_data, ctx)
|
||||
.await
|
||||
}
|
||||
Image(i) => {
|
||||
i.get_value_reconstruct_data(key, reconstruct_data, ctx)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
|
||||
@@ -50,13 +50,26 @@ async fn smoke_test() {
|
||||
// all layers created at pageserver are like `layer`, initialized with strong
|
||||
// Arc<DownloadedLayer>.
|
||||
|
||||
let controlfile_keyspace = KeySpace {
|
||||
ranges: vec![CONTROLFILE_KEY..CONTROLFILE_KEY.next()],
|
||||
};
|
||||
|
||||
let img_before = {
|
||||
let mut data = ValueReconstructState::default();
|
||||
let mut data = ValuesReconstructState::default();
|
||||
layer
|
||||
.get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
|
||||
.get_values_reconstruct_data(
|
||||
controlfile_keyspace.clone(),
|
||||
Lsn(0x10)..Lsn(0x11),
|
||||
&mut data,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
data.img
|
||||
data.keys
|
||||
.remove(&CONTROLFILE_KEY)
|
||||
.expect("must be present")
|
||||
.expect("should not error")
|
||||
.img
|
||||
.take()
|
||||
.expect("tenant harness writes the control file")
|
||||
};
|
||||
@@ -74,13 +87,24 @@ async fn smoke_test() {
|
||||
|
||||
// on accesses when the layer is evicted, it will automatically be downloaded.
|
||||
let img_after = {
|
||||
let mut data = ValueReconstructState::default();
|
||||
let mut data = ValuesReconstructState::default();
|
||||
layer
|
||||
.get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
|
||||
.get_values_reconstruct_data(
|
||||
controlfile_keyspace.clone(),
|
||||
Lsn(0x10)..Lsn(0x11),
|
||||
&mut data,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(download_span.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
data.img.take().unwrap()
|
||||
data.keys
|
||||
.remove(&CONTROLFILE_KEY)
|
||||
.expect("must be present")
|
||||
.expect("should not error")
|
||||
.img
|
||||
.take()
|
||||
.expect("tenant harness writes the control file")
|
||||
};
|
||||
|
||||
assert_eq!(img_before, img_after);
|
||||
@@ -830,7 +854,7 @@ async fn eviction_cancellation_on_drop() {
|
||||
fn layer_size() {
|
||||
assert_eq!(size_of::<LayerAccessStats>(), 8);
|
||||
assert_eq!(size_of::<PersistentLayerDesc>(), 104);
|
||||
assert_eq!(size_of::<LayerInner>(), 312);
|
||||
assert_eq!(size_of::<LayerInner>(), 296);
|
||||
// it also has the utf8 path
|
||||
}
|
||||
|
||||
|
||||
449
pageserver/src/tenant/storage_layer/split_writer.rs
Normal file
449
pageserver/src/tenant/storage_layer/split_writer.rs
Normal file
@@ -0,0 +1,449 @@
|
||||
use std::{ops::Range, sync::Arc};
|
||||
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::{Key, KEY_SIZE};
|
||||
use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
|
||||
|
||||
use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
|
||||
|
||||
use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer};
|
||||
|
||||
/// An image writer that takes images and produces multiple image layers. The interface does not
|
||||
/// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files
|
||||
/// to be cleaned up)
|
||||
#[must_use]
|
||||
pub struct SplitImageLayerWriter {
|
||||
inner: ImageLayerWriter,
|
||||
target_layer_size: u64,
|
||||
generated_layers: Vec<ResidentLayer>,
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
lsn: Lsn,
|
||||
}
|
||||
|
||||
impl SplitImageLayerWriter {
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
start_key: Key,
|
||||
lsn: Lsn,
|
||||
target_layer_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
target_layer_size,
|
||||
inner: ImageLayerWriter::new(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
&(start_key..Key::MAX),
|
||||
lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
generated_layers: Vec::new(),
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
lsn,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn put_image(
|
||||
&mut self,
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// The current estimation is an upper bound of the space that the key/image could take
|
||||
// because we did not consider compression in this estimation. The resulting image layer
|
||||
// could be smaller than the target size.
|
||||
let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
|
||||
if self.inner.num_keys() >= 1
|
||||
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
|
||||
{
|
||||
let next_image_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
&(key..Key::MAX),
|
||||
self.lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
|
||||
self.generated_layers.push(
|
||||
prev_image_writer
|
||||
.finish_with_end_key(tline, key, ctx)
|
||||
.await?,
|
||||
);
|
||||
}
|
||||
self.inner.put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn finish(
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
) -> anyhow::Result<Vec<ResidentLayer>> {
|
||||
let Self {
|
||||
mut generated_layers,
|
||||
inner,
|
||||
..
|
||||
} = self;
|
||||
generated_layers.push(inner.finish_with_end_key(tline, end_key, ctx).await?);
|
||||
Ok(generated_layers)
|
||||
}
|
||||
|
||||
/// When split writer fails, the caller should call this function and handle partially generated layers.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn take(self) -> anyhow::Result<(Vec<ResidentLayer>, ImageLayerWriter)> {
|
||||
Ok((self.generated_layers, self.inner))
|
||||
}
|
||||
}
|
||||
|
||||
/// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not
|
||||
/// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files
|
||||
/// to be cleaned up).
|
||||
#[must_use]
|
||||
pub struct SplitDeltaLayerWriter {
|
||||
inner: DeltaLayerWriter,
|
||||
target_layer_size: u64,
|
||||
generated_layers: Vec<ResidentLayer>,
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
lsn_range: Range<Lsn>,
|
||||
}
|
||||
|
||||
impl SplitDeltaLayerWriter {
|
||||
pub async fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
start_key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
target_layer_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
target_layer_size,
|
||||
inner: DeltaLayerWriter::new(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
start_key,
|
||||
lsn_range.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
generated_layers: Vec::new(),
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
lsn_range,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn put_value(
|
||||
&mut self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
|
||||
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
|
||||
let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
|
||||
if self.inner.num_keys() >= 1
|
||||
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
|
||||
{
|
||||
let next_delta_writer = DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
key,
|
||||
self.lsn_range.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
|
||||
self.generated_layers
|
||||
.push(prev_delta_writer.finish(key, tline, ctx).await?);
|
||||
}
|
||||
self.inner.put_value(key, lsn, val, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn finish(
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
) -> anyhow::Result<Vec<ResidentLayer>> {
|
||||
let Self {
|
||||
mut generated_layers,
|
||||
inner,
|
||||
..
|
||||
} = self;
|
||||
generated_layers.push(inner.finish(end_key, tline, ctx).await?);
|
||||
Ok(generated_layers)
|
||||
}
|
||||
|
||||
/// When split writer fails, the caller should call this function and handle partially generated layers.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn take(self) -> anyhow::Result<(Vec<ResidentLayer>, DeltaLayerWriter)> {
|
||||
Ok((self.generated_layers, self.inner))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{
|
||||
tenant::{
|
||||
harness::{TenantHarness, TIMELINE_ID},
|
||||
storage_layer::AsLayerDesc,
|
||||
},
|
||||
DEFAULT_PG_VERSION,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
fn get_key(id: u32) -> Key {
|
||||
let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
|
||||
key.field6 = id;
|
||||
key
|
||||
}
|
||||
|
||||
fn get_img(id: u32) -> Bytes {
|
||||
format!("{id:064}").into()
|
||||
}
|
||||
|
||||
fn get_large_img() -> Bytes {
|
||||
vec![0; 8192].into()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_one_image() {
|
||||
let harness = TenantHarness::create("split_writer_write_one_image")
|
||||
.await
|
||||
.unwrap();
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut image_writer = SplitImageLayerWriter::new(
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
4 * 1024 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut delta_writer = SplitDeltaLayerWriter::new(
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
image_writer
|
||||
.put_image(get_key(0), get_img(0), &tline, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = image_writer
|
||||
.finish(&tline, &ctx, get_key(10))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(layers.len(), 1);
|
||||
|
||||
delta_writer
|
||||
.put_value(
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
Value::Image(get_img(0)),
|
||||
&tline,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = delta_writer
|
||||
.finish(&tline, &ctx, get_key(10))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(layers.len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_split() {
|
||||
let harness = TenantHarness::create("split_writer_write_split")
|
||||
.await
|
||||
.unwrap();
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut image_writer = SplitImageLayerWriter::new(
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
4 * 1024 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut delta_writer = SplitDeltaLayerWriter::new(
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
const N: usize = 2000;
|
||||
for i in 0..N {
|
||||
let i = i as u32;
|
||||
image_writer
|
||||
.put_image(get_key(i), get_large_img(), &tline, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
delta_writer
|
||||
.put_value(
|
||||
get_key(i),
|
||||
Lsn(0x20),
|
||||
Value::Image(get_large_img()),
|
||||
&tline,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let image_layers = image_writer
|
||||
.finish(&tline, &ctx, get_key(N as u32))
|
||||
.await
|
||||
.unwrap();
|
||||
let delta_layers = delta_writer
|
||||
.finish(&tline, &ctx, get_key(N as u32))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(image_layers.len(), N / 512 + 1);
|
||||
assert_eq!(delta_layers.len(), N / 512 + 1);
|
||||
for idx in 0..image_layers.len() {
|
||||
assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
|
||||
assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
|
||||
assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
|
||||
assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
|
||||
if idx > 0 {
|
||||
assert_eq!(
|
||||
image_layers[idx - 1].layer_desc().key_range.end,
|
||||
image_layers[idx].layer_desc().key_range.start
|
||||
);
|
||||
assert_eq!(
|
||||
delta_layers[idx - 1].layer_desc().key_range.end,
|
||||
delta_layers[idx].layer_desc().key_range.start
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_large_img() {
|
||||
let harness = TenantHarness::create("split_writer_write_large_img")
|
||||
.await
|
||||
.unwrap();
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut image_writer = SplitImageLayerWriter::new(
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
4 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut delta_writer = SplitDeltaLayerWriter::new(
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
image_writer
|
||||
.put_image(get_key(0), get_img(0), &tline, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
image_writer
|
||||
.put_image(get_key(1), get_large_img(), &tline, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = image_writer
|
||||
.finish(&tline, &ctx, get_key(10))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(layers.len(), 2);
|
||||
|
||||
delta_writer
|
||||
.put_value(
|
||||
get_key(0),
|
||||
Lsn(0x18),
|
||||
Value::Image(get_img(0)),
|
||||
&tline,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
delta_writer
|
||||
.put_value(
|
||||
get_key(1),
|
||||
Lsn(0x1A),
|
||||
Value::Image(get_large_img()),
|
||||
&tline,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = delta_writer
|
||||
.finish(&tline, &ctx, get_key(10))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(layers.len(), 2);
|
||||
}
|
||||
}
|
||||
@@ -22,8 +22,8 @@ use handle::ShardTimelineId;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::{
|
||||
key::{
|
||||
AUX_FILES_KEY, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX,
|
||||
NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE,
|
||||
KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE,
|
||||
NON_INHERITED_SPARSE_RANGE,
|
||||
},
|
||||
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
|
||||
models::{
|
||||
@@ -59,10 +59,7 @@ use std::{
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
sync::atomic::AtomicU64,
|
||||
};
|
||||
use std::{
|
||||
cmp::{max, min},
|
||||
ops::ControlFlow,
|
||||
};
|
||||
use std::{cmp::min, ops::ControlFlow};
|
||||
use std::{
|
||||
collections::btree_map::Entry,
|
||||
ops::{Deref, Range},
|
||||
@@ -87,8 +84,8 @@ use crate::{
|
||||
disk_usage_eviction_task::finite_f32,
|
||||
tenant::storage_layer::{
|
||||
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
|
||||
LayerAccessStatsReset, LayerName, ResidentLayer, ValueReconstructResult,
|
||||
ValueReconstructState, ValuesReconstructState,
|
||||
LayerAccessStatsReset, LayerName, ResidentLayer, ValueReconstructState,
|
||||
ValuesReconstructState,
|
||||
},
|
||||
};
|
||||
use crate::{
|
||||
@@ -143,7 +140,10 @@ use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
use super::{config::TenantConf, upload_queue::NotInitialized};
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
||||
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};
|
||||
use super::{
|
||||
remote_timeline_client::RemoteTimelineClient, remote_timeline_client::WaitCompletionError,
|
||||
storage_layer::ReadableLayer,
|
||||
};
|
||||
use super::{
|
||||
secondary::heatmap::{HeatMapLayer, HeatMapTimeline},
|
||||
GcError,
|
||||
@@ -540,7 +540,6 @@ pub struct MissingKeyError {
|
||||
cont_lsn: Lsn,
|
||||
request_lsn: Lsn,
|
||||
ancestor_lsn: Option<Lsn>,
|
||||
traversal_path: Vec<TraversalPathItem>,
|
||||
backtrace: Option<std::backtrace::Backtrace>,
|
||||
}
|
||||
|
||||
@@ -561,18 +560,6 @@ impl std::fmt::Display for MissingKeyError {
|
||||
write!(f, ", ancestor {}", ancestor_lsn)?;
|
||||
}
|
||||
|
||||
if !self.traversal_path.is_empty() {
|
||||
writeln!(f)?;
|
||||
}
|
||||
|
||||
for (r, c, l) in &self.traversal_path {
|
||||
writeln!(
|
||||
f,
|
||||
"layer traversal: result {:?}, cont_lsn {}, layer: {}",
|
||||
r, c, l,
|
||||
)?;
|
||||
}
|
||||
|
||||
if let Some(ref backtrace) = self.backtrace {
|
||||
write!(f, "\n{}", backtrace)?;
|
||||
}
|
||||
@@ -701,6 +688,7 @@ pub(crate) enum CompactFlags {
|
||||
ForceRepartition,
|
||||
ForceImageLayerCreation,
|
||||
EnhancedGcBottomMostCompaction,
|
||||
DryRun,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Timeline {
|
||||
@@ -914,119 +902,44 @@ impl Timeline {
|
||||
|
||||
self.timeline_get_throttle.throttle(ctx, 1).await;
|
||||
|
||||
match self.conf.get_impl {
|
||||
GetImpl::Legacy => {
|
||||
let reconstruct_state = ValueReconstructState {
|
||||
records: Vec::new(),
|
||||
img: None,
|
||||
};
|
||||
let keyspace = KeySpace {
|
||||
ranges: vec![key..key.next()],
|
||||
};
|
||||
|
||||
self.get_impl(key, lsn, reconstruct_state, ctx).await
|
||||
}
|
||||
GetImpl::Vectored => {
|
||||
let keyspace = KeySpace {
|
||||
ranges: vec![key..key.next()],
|
||||
};
|
||||
// Initialise the reconstruct state for the key with the cache
|
||||
// entry returned above.
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
|
||||
// Initialise the reconstruct state for the key with the cache
|
||||
// entry returned above.
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
let vectored_res = self
|
||||
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
|
||||
.await;
|
||||
|
||||
let vectored_res = self
|
||||
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
|
||||
.await;
|
||||
|
||||
if self.conf.validate_vectored_get {
|
||||
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
|
||||
.await;
|
||||
}
|
||||
|
||||
let key_value = vectored_res?.pop_first();
|
||||
match key_value {
|
||||
Some((got_key, value)) => {
|
||||
if got_key != key {
|
||||
error!(
|
||||
"Expected {}, but singular vectored get returned {}",
|
||||
key, got_key
|
||||
);
|
||||
Err(PageReconstructError::Other(anyhow!(
|
||||
"Singular vectored get returned wrong key"
|
||||
)))
|
||||
} else {
|
||||
value
|
||||
}
|
||||
}
|
||||
None => Err(PageReconstructError::MissingKey(MissingKeyError {
|
||||
key,
|
||||
shard: self.shard_identity.get_shard_number(&key),
|
||||
cont_lsn: Lsn(0),
|
||||
request_lsn: lsn,
|
||||
ancestor_lsn: None,
|
||||
traversal_path: Vec::new(),
|
||||
backtrace: None,
|
||||
})),
|
||||
let key_value = vectored_res?.pop_first();
|
||||
match key_value {
|
||||
Some((got_key, value)) => {
|
||||
if got_key != key {
|
||||
error!(
|
||||
"Expected {}, but singular vectored get returned {}",
|
||||
key, got_key
|
||||
);
|
||||
Err(PageReconstructError::Other(anyhow!(
|
||||
"Singular vectored get returned wrong key"
|
||||
)))
|
||||
} else {
|
||||
value
|
||||
}
|
||||
}
|
||||
None => Err(PageReconstructError::MissingKey(MissingKeyError {
|
||||
key,
|
||||
shard: self.shard_identity.get_shard_number(&key),
|
||||
cont_lsn: Lsn(0),
|
||||
request_lsn: lsn,
|
||||
ancestor_lsn: None,
|
||||
backtrace: None,
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
/// Not subject to [`Self::timeline_get_throttle`].
|
||||
async fn get_impl(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
mut reconstruct_state: ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
// XXX: structured stats collection for layer eviction here.
|
||||
trace!(
|
||||
"get page request for {}@{} from task kind {:?}",
|
||||
key,
|
||||
lsn,
|
||||
ctx.task_kind()
|
||||
);
|
||||
|
||||
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME
|
||||
.for_get_kind(GetKind::Singular)
|
||||
.start_timer();
|
||||
let path = self
|
||||
.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
|
||||
.await?;
|
||||
timer.stop_and_record();
|
||||
|
||||
let start = Instant::now();
|
||||
let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
|
||||
let elapsed = start.elapsed();
|
||||
crate::metrics::RECONSTRUCT_TIME
|
||||
.for_get_kind(GetKind::Singular)
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
if cfg!(feature = "testing")
|
||||
&& res.is_err()
|
||||
&& !matches!(res, Err(PageReconstructError::Cancelled))
|
||||
{
|
||||
// it can only be walredo issue
|
||||
use std::fmt::Write;
|
||||
|
||||
let mut msg = String::new();
|
||||
|
||||
path.into_iter().for_each(|(res, cont_lsn, layer)| {
|
||||
writeln!(
|
||||
msg,
|
||||
"- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
|
||||
layer,
|
||||
)
|
||||
.expect("string grows")
|
||||
});
|
||||
|
||||
// this is to rule out or provide evidence that we could in some cases read a duplicate
|
||||
// walrecord
|
||||
tracing::info!("walredo failed, path:\n{msg}");
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
|
||||
pub(crate) const VEC_GET_LAYERS_VISITED_WARN_THRESH: f64 = 512.0;
|
||||
|
||||
@@ -1076,28 +989,14 @@ impl Timeline {
|
||||
.throttle(ctx, key_count as usize)
|
||||
.await;
|
||||
|
||||
let res = match self.conf.get_vectored_impl {
|
||||
GetVectoredImpl::Sequential => {
|
||||
self.get_vectored_sequential_impl(keyspace, lsn, ctx).await
|
||||
}
|
||||
GetVectoredImpl::Vectored => {
|
||||
let vectored_res = self
|
||||
.get_vectored_impl(
|
||||
keyspace.clone(),
|
||||
lsn,
|
||||
&mut ValuesReconstructState::new(),
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
if self.conf.validate_vectored_get {
|
||||
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
|
||||
.await;
|
||||
}
|
||||
|
||||
vectored_res
|
||||
}
|
||||
};
|
||||
let res = self
|
||||
.get_vectored_impl(
|
||||
keyspace.clone(),
|
||||
lsn,
|
||||
&mut ValuesReconstructState::new(),
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some((metric, start)) = start {
|
||||
let elapsed = start.elapsed();
|
||||
@@ -1186,65 +1085,6 @@ impl Timeline {
|
||||
vectored_res
|
||||
}
|
||||
|
||||
/// Not subject to [`Self::timeline_get_throttle`].
|
||||
pub(super) async fn get_vectored_sequential_impl(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
let mut values = BTreeMap::new();
|
||||
|
||||
for range in keyspace.ranges {
|
||||
let mut key = range.start;
|
||||
while key != range.end {
|
||||
let block = self
|
||||
.get_impl(key, lsn, ValueReconstructState::default(), ctx)
|
||||
.await;
|
||||
|
||||
use PageReconstructError::*;
|
||||
match block {
|
||||
Err(Cancelled) => return Err(GetVectoredError::Cancelled),
|
||||
Err(MissingKey(_))
|
||||
if NON_INHERITED_RANGE.contains(&key)
|
||||
|| NON_INHERITED_SPARSE_RANGE.contains(&key) =>
|
||||
{
|
||||
// Ignore missing key error for aux key range. TODO: currently, we assume non_inherited_range == aux_key_range.
|
||||
// When we add more types of keys into the page server, we should revisit this part of code and throw errors
|
||||
// accordingly.
|
||||
key = key.next();
|
||||
}
|
||||
Err(MissingKey(err)) => {
|
||||
return Err(GetVectoredError::MissingKey(err));
|
||||
}
|
||||
Err(Other(err))
|
||||
if err
|
||||
.to_string()
|
||||
.contains("downloading evicted layer file failed") =>
|
||||
{
|
||||
return Err(GetVectoredError::Other(err))
|
||||
}
|
||||
Err(Other(err))
|
||||
if err
|
||||
.chain()
|
||||
.any(|cause| cause.to_string().contains("layer loading failed")) =>
|
||||
{
|
||||
// The intent here is to achieve error parity with the vectored read path.
|
||||
// When vectored read fails to load a layer it fails the whole read, hence
|
||||
// we mimic this behaviour here to keep the validation happy.
|
||||
return Err(GetVectoredError::Other(err));
|
||||
}
|
||||
_ => {
|
||||
values.insert(key, block);
|
||||
key = key.next();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
pub(super) async fn get_vectored_impl(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
@@ -1315,113 +1155,6 @@ impl Timeline {
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Not subject to [`Self::timeline_get_throttle`].
|
||||
pub(super) async fn validate_get_vectored_impl(
|
||||
&self,
|
||||
vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
|
||||
keyspace: KeySpace,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
if keyspace.overlaps(&Key::metadata_key_range()) {
|
||||
// skip validation for metadata key range
|
||||
return;
|
||||
}
|
||||
|
||||
let sequential_res = self
|
||||
.get_vectored_sequential_impl(keyspace.clone(), lsn, ctx)
|
||||
.await;
|
||||
|
||||
fn errors_match(lhs: &GetVectoredError, rhs: &GetVectoredError) -> bool {
|
||||
use GetVectoredError::*;
|
||||
match (lhs, rhs) {
|
||||
(Oversized(l), Oversized(r)) => l == r,
|
||||
(InvalidLsn(l), InvalidLsn(r)) => l == r,
|
||||
(MissingKey(l), MissingKey(r)) => l.key == r.key,
|
||||
(GetReadyAncestorError(_), GetReadyAncestorError(_)) => true,
|
||||
(Other(_), Other(_)) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
match (&sequential_res, vectored_res) {
|
||||
(Err(GetVectoredError::Cancelled), _) => {},
|
||||
(_, Err(GetVectoredError::Cancelled)) => {},
|
||||
(Err(seq_err), Ok(_)) => {
|
||||
panic!(concat!("Sequential get failed with {}, but vectored get did not",
|
||||
" - keyspace={:?} lsn={}"),
|
||||
seq_err, keyspace, lsn) },
|
||||
(Ok(_), Err(GetVectoredError::GetReadyAncestorError(GetReadyAncestorError::AncestorLsnTimeout(_)))) => {
|
||||
// Sequential get runs after vectored get, so it is possible for the later
|
||||
// to time out while waiting for its ancestor's Lsn to become ready and for the
|
||||
// former to succeed (it essentially has a doubled wait time).
|
||||
},
|
||||
(Ok(_), Err(vec_err)) => {
|
||||
panic!(concat!("Vectored get failed with {}, but sequential get did not",
|
||||
" - keyspace={:?} lsn={}"),
|
||||
vec_err, keyspace, lsn) },
|
||||
(Err(seq_err), Err(vec_err)) => {
|
||||
assert!(errors_match(seq_err, vec_err),
|
||||
"Mismatched errors: {seq_err} != {vec_err} - keyspace={keyspace:?} lsn={lsn}")},
|
||||
(Ok(seq_values), Ok(vec_values)) => {
|
||||
seq_values.iter().zip(vec_values.iter()).for_each(|((seq_key, seq_res), (vec_key, vec_res))| {
|
||||
assert_eq!(seq_key, vec_key);
|
||||
match (seq_res, vec_res) {
|
||||
(Ok(seq_blob), Ok(vec_blob)) => {
|
||||
Self::validate_key_equivalence(seq_key, &keyspace, lsn, seq_blob, vec_blob);
|
||||
},
|
||||
(Err(err), Ok(_)) => {
|
||||
panic!(
|
||||
concat!("Sequential get failed with {} for key {}, but vectored get did not",
|
||||
" - keyspace={:?} lsn={}"),
|
||||
err, seq_key, keyspace, lsn) },
|
||||
(Ok(_), Err(err)) => {
|
||||
panic!(
|
||||
concat!("Vectored get failed with {} for key {}, but sequential get did not",
|
||||
" - keyspace={:?} lsn={}"),
|
||||
err, seq_key, keyspace, lsn) },
|
||||
(Err(_), Err(_)) => {}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_key_equivalence(
|
||||
key: &Key,
|
||||
keyspace: &KeySpace,
|
||||
lsn: Lsn,
|
||||
seq: &Bytes,
|
||||
vec: &Bytes,
|
||||
) {
|
||||
if *key == AUX_FILES_KEY {
|
||||
// The value reconstruct of AUX_FILES_KEY from records is not deterministic
|
||||
// since it uses a hash map under the hood. Hence, deserialise both results
|
||||
// before comparing.
|
||||
let seq_aux_dir_res = AuxFilesDirectory::des(seq);
|
||||
let vec_aux_dir_res = AuxFilesDirectory::des(vec);
|
||||
match (&seq_aux_dir_res, &vec_aux_dir_res) {
|
||||
(Ok(seq_aux_dir), Ok(vec_aux_dir)) => {
|
||||
assert_eq!(
|
||||
seq_aux_dir, vec_aux_dir,
|
||||
"Mismatch for key {} - keyspace={:?} lsn={}",
|
||||
key, keyspace, lsn
|
||||
);
|
||||
}
|
||||
(Err(_), Err(_)) => {}
|
||||
_ => {
|
||||
panic!("Mismatch for {key}: {seq_aux_dir_res:?} != {vec_aux_dir_res:?}");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// All other keys should reconstruct deterministically, so we simply compare the blobs.
|
||||
assert_eq!(
|
||||
seq, vec,
|
||||
"Image mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
|
||||
pub(crate) fn get_last_record_lsn(&self) -> Lsn {
|
||||
self.last_record_lsn.load().last
|
||||
@@ -3211,228 +2944,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
type TraversalId = Arc<str>;
|
||||
|
||||
trait TraversalLayerExt {
|
||||
fn traversal_id(&self) -> TraversalId;
|
||||
}
|
||||
|
||||
impl TraversalLayerExt for Layer {
|
||||
fn traversal_id(&self) -> TraversalId {
|
||||
Arc::clone(self.debug_str())
|
||||
}
|
||||
}
|
||||
|
||||
impl TraversalLayerExt for Arc<InMemoryLayer> {
|
||||
fn traversal_id(&self) -> TraversalId {
|
||||
Arc::clone(self.local_path_str())
|
||||
}
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
///
|
||||
/// Get a handle to a Layer for reading.
|
||||
///
|
||||
/// The returned Layer might be from an ancestor timeline, if the
|
||||
/// segment hasn't been updated on this timeline yet.
|
||||
///
|
||||
/// This function takes the current timeline's locked LayerMap as an argument,
|
||||
/// so callers can avoid potential race conditions.
|
||||
///
|
||||
/// # Cancel-Safety
|
||||
///
|
||||
/// This method is cancellation-safe.
|
||||
async fn get_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
request_lsn: Lsn,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
|
||||
// Start from the current timeline.
|
||||
let mut timeline_owned;
|
||||
let mut timeline = self;
|
||||
|
||||
let mut read_count = scopeguard::guard(0, |cnt| {
|
||||
crate::metrics::READ_NUM_LAYERS_VISITED.observe(cnt as f64)
|
||||
});
|
||||
|
||||
// For debugging purposes, collect the path of layers that we traversed
|
||||
// through. It's included in the error message if we fail to find the key.
|
||||
let mut traversal_path = Vec::<TraversalPathItem>::new();
|
||||
|
||||
let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
|
||||
*cached_lsn
|
||||
} else {
|
||||
Lsn(0)
|
||||
};
|
||||
|
||||
// 'prev_lsn' tracks the last LSN that we were at in our search. It's used
|
||||
// to check that each iteration make some progress, to break infinite
|
||||
// looping if something goes wrong.
|
||||
let mut prev_lsn = None;
|
||||
|
||||
let mut result = ValueReconstructResult::Continue;
|
||||
let mut cont_lsn = Lsn(request_lsn.0 + 1);
|
||||
|
||||
'outer: loop {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(PageReconstructError::Cancelled);
|
||||
}
|
||||
|
||||
// The function should have updated 'state'
|
||||
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
|
||||
match result {
|
||||
ValueReconstructResult::Complete => return Ok(traversal_path),
|
||||
ValueReconstructResult::Continue => {
|
||||
// If we reached an earlier cached page image, we're done.
|
||||
if cont_lsn == cached_lsn + 1 {
|
||||
return Ok(traversal_path);
|
||||
}
|
||||
if let Some(prev) = prev_lsn {
|
||||
if prev <= cont_lsn {
|
||||
// Didn't make any progress in last iteration. Error out to avoid
|
||||
// getting stuck in the loop.
|
||||
return Err(PageReconstructError::MissingKey(MissingKeyError {
|
||||
key,
|
||||
shard: self.shard_identity.get_shard_number(&key),
|
||||
cont_lsn: Lsn(cont_lsn.0 - 1),
|
||||
request_lsn,
|
||||
ancestor_lsn: Some(timeline.ancestor_lsn),
|
||||
traversal_path,
|
||||
backtrace: None,
|
||||
}));
|
||||
}
|
||||
}
|
||||
prev_lsn = Some(cont_lsn);
|
||||
}
|
||||
ValueReconstructResult::Missing => {
|
||||
return Err(PageReconstructError::MissingKey(MissingKeyError {
|
||||
key,
|
||||
shard: self.shard_identity.get_shard_number(&key),
|
||||
cont_lsn,
|
||||
request_lsn,
|
||||
ancestor_lsn: None,
|
||||
traversal_path,
|
||||
backtrace: if cfg!(test) {
|
||||
Some(std::backtrace::Backtrace::force_capture())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
// Recurse into ancestor if needed
|
||||
if let Some(ancestor_timeline) = timeline.ancestor_timeline.as_ref() {
|
||||
if key.is_inherited_key() && Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
|
||||
trace!(
|
||||
"going into ancestor {}, cont_lsn is {}",
|
||||
timeline.ancestor_lsn,
|
||||
cont_lsn
|
||||
);
|
||||
|
||||
timeline_owned = timeline
|
||||
.get_ready_ancestor_timeline(ancestor_timeline, ctx)
|
||||
.await?;
|
||||
timeline = &*timeline_owned;
|
||||
prev_lsn = None;
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
|
||||
let guard = timeline.layers.read().await;
|
||||
let layers = guard.layer_map();
|
||||
|
||||
// Check the open and frozen in-memory layers first, in order from newest
|
||||
// to oldest.
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let start_lsn = open_layer.get_lsn_range().start;
|
||||
if cont_lsn > start_lsn {
|
||||
//info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.layer_name().display());
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||
|
||||
let open_layer = open_layer.clone();
|
||||
drop(guard);
|
||||
|
||||
result = match open_layer
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
*read_count += 1;
|
||||
traversal_path.push((result, cont_lsn, open_layer.traversal_id()));
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
for frozen_layer in layers.frozen_layers.iter().rev() {
|
||||
let start_lsn = frozen_layer.get_lsn_range().start;
|
||||
if cont_lsn > start_lsn {
|
||||
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.layer_name().display());
|
||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||
|
||||
let frozen_layer = frozen_layer.clone();
|
||||
drop(guard);
|
||||
|
||||
result = match frozen_layer
|
||||
.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
*read_count += 1;
|
||||
traversal_path.push((result, cont_lsn, frozen_layer.traversal_id()));
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
|
||||
let layer = guard.get_from_desc(&layer);
|
||||
drop(guard);
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, lsn_floor);
|
||||
result = match layer
|
||||
.get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state, ctx)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
*read_count += 1;
|
||||
traversal_path.push((result, cont_lsn, layer.traversal_id()));
|
||||
continue 'outer;
|
||||
} else if timeline.ancestor_timeline.is_some() {
|
||||
// Nothing on this timeline. Traverse to parent
|
||||
result = ValueReconstructResult::Continue;
|
||||
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
|
||||
continue 'outer;
|
||||
} else {
|
||||
// Nothing found
|
||||
result = ValueReconstructResult::Missing;
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::doc_lazy_continuation)]
|
||||
/// Get the data needed to reconstruct all keys in the provided keyspace
|
||||
///
|
||||
@@ -3526,7 +3038,6 @@ impl Timeline {
|
||||
cont_lsn,
|
||||
request_lsn,
|
||||
ancestor_lsn: Some(timeline.ancestor_lsn),
|
||||
traversal_path: vec![],
|
||||
backtrace: None,
|
||||
}));
|
||||
}
|
||||
@@ -4089,6 +3600,21 @@ impl Timeline {
|
||||
// release lock on 'layers'
|
||||
};
|
||||
|
||||
// Backpressure mechanism: wait with continuation of the flush loop until we have uploaded all layer files.
|
||||
// This makes us refuse ingest until the new layers have been persisted to the remote.
|
||||
self.remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
WaitCompletionError::UploadQueueShutDownOrStopped
|
||||
| WaitCompletionError::NotInitialized(
|
||||
NotInitialized::ShuttingDown | NotInitialized::Stopped,
|
||||
) => FlushLayerError::Cancelled,
|
||||
WaitCompletionError::NotInitialized(NotInitialized::Uninitialized) => {
|
||||
FlushLayerError::Other(anyhow!(e).into())
|
||||
}
|
||||
})?;
|
||||
|
||||
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
|
||||
// a compaction can delete the file and then it won't be available for uploads any more.
|
||||
// We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
|
||||
@@ -4104,17 +3630,11 @@ impl Timeline {
|
||||
|
||||
/// Return true if the value changed
|
||||
///
|
||||
/// This function must only be used from the layer flush task, and may not be called concurrently.
|
||||
/// This function must only be used from the layer flush task.
|
||||
fn set_disk_consistent_lsn(&self, new_value: Lsn) -> bool {
|
||||
// We do a simple load/store cycle: that's why this function isn't safe for concurrent use.
|
||||
let old_value = self.disk_consistent_lsn.load();
|
||||
if new_value != old_value {
|
||||
assert!(new_value >= old_value);
|
||||
self.disk_consistent_lsn.store(new_value);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
let old_value = self.disk_consistent_lsn.fetch_max(new_value);
|
||||
assert!(new_value >= old_value, "disk_consistent_lsn must be growing monotonously at runtime; current {old_value}, offered {new_value}");
|
||||
new_value != old_value
|
||||
}
|
||||
|
||||
/// Update metadata file
|
||||
@@ -4709,6 +4229,12 @@ impl Timeline {
|
||||
return;
|
||||
}
|
||||
|
||||
if self.current_logical_size.current_size().is_exact() {
|
||||
// root timelines are initialized with exact count, but never start the background
|
||||
// calculation
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(await_bg_cancel) = self
|
||||
.current_logical_size
|
||||
.cancel_wait_for_background_loop_concurrency_limit_semaphore
|
||||
@@ -5679,6 +5205,22 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Persistently blocks gc for `Manual` reason.
|
||||
///
|
||||
/// Returns true if no such block existed before, false otherwise.
|
||||
pub(crate) async fn block_gc(&self, tenant: &super::Tenant) -> anyhow::Result<bool> {
|
||||
use crate::tenant::remote_timeline_client::index::GcBlockingReason;
|
||||
assert_eq!(self.tenant_shard_id, tenant.tenant_shard_id);
|
||||
tenant.gc_block.insert(self, GcBlockingReason::Manual).await
|
||||
}
|
||||
|
||||
/// Persistently unblocks gc for `Manual` reason.
|
||||
pub(crate) async fn unblock_gc(&self, tenant: &super::Tenant) -> anyhow::Result<()> {
|
||||
use crate::tenant::remote_timeline_client::index::GcBlockingReason;
|
||||
assert_eq!(self.tenant_shard_id, tenant.tenant_shard_id);
|
||||
tenant.gc_block.remove(self, GcBlockingReason::Manual).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(super) fn force_advance_lsn(self: &Arc<Timeline>, new_lsn: Lsn) {
|
||||
self.last_record_lsn.advance(new_lsn);
|
||||
@@ -5860,8 +5402,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
type TraversalPathItem = (ValueReconstructResult, Lsn, TraversalId);
|
||||
|
||||
/// Tracking writes ingestion does to a particular in-memory layer.
|
||||
///
|
||||
/// Cleared upon freezing a layer.
|
||||
|
||||
@@ -19,8 +19,10 @@ use bytes::Bytes;
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::KEY_SIZE;
|
||||
use pageserver_api::keyspace::ShardedRange;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
|
||||
use serde::Serialize;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, info_span, trace, warn, Instrument};
|
||||
use utils::id::TimelineId;
|
||||
@@ -41,6 +43,7 @@ use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
|
||||
use crate::keyspace::KeySpace;
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -73,6 +76,7 @@ impl KeyHistoryRetention {
|
||||
key: Key,
|
||||
delta_writer: &mut Vec<(Key, Lsn, Value)>,
|
||||
mut image_writer: Option<&mut ImageLayerWriter>,
|
||||
stat: &mut CompactionStatistics,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut first_batch = true;
|
||||
@@ -82,6 +86,7 @@ impl KeyHistoryRetention {
|
||||
let Value::Image(img) = &logs[0].1 else {
|
||||
unreachable!()
|
||||
};
|
||||
stat.produce_image_key(img);
|
||||
if let Some(image_writer) = image_writer.as_mut() {
|
||||
image_writer.put_image(key, img.clone(), ctx).await?;
|
||||
} else {
|
||||
@@ -89,24 +94,111 @@ impl KeyHistoryRetention {
|
||||
}
|
||||
} else {
|
||||
for (lsn, val) in logs {
|
||||
stat.produce_key(&val);
|
||||
delta_writer.push((key, lsn, val));
|
||||
}
|
||||
}
|
||||
first_batch = false;
|
||||
} else {
|
||||
for (lsn, val) in logs {
|
||||
stat.produce_key(&val);
|
||||
delta_writer.push((key, lsn, val));
|
||||
}
|
||||
}
|
||||
}
|
||||
let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
|
||||
for (lsn, val) in above_horizon_logs {
|
||||
stat.produce_key(&val);
|
||||
delta_writer.push((key, lsn, val));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Default)]
|
||||
struct CompactionStatisticsNumSize {
|
||||
num: u64,
|
||||
size: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Default)]
|
||||
pub struct CompactionStatistics {
|
||||
delta_layer_visited: CompactionStatisticsNumSize,
|
||||
image_layer_visited: CompactionStatisticsNumSize,
|
||||
delta_layer_produced: CompactionStatisticsNumSize,
|
||||
image_layer_produced: CompactionStatisticsNumSize,
|
||||
num_delta_layer_discarded: usize,
|
||||
num_image_layer_discarded: usize,
|
||||
num_unique_keys_visited: usize,
|
||||
wal_keys_visited: CompactionStatisticsNumSize,
|
||||
image_keys_visited: CompactionStatisticsNumSize,
|
||||
wal_produced: CompactionStatisticsNumSize,
|
||||
image_produced: CompactionStatisticsNumSize,
|
||||
}
|
||||
|
||||
impl CompactionStatistics {
|
||||
fn estimated_size_of_value(val: &Value) -> usize {
|
||||
match val {
|
||||
Value::Image(img) => img.len(),
|
||||
Value::WalRecord(NeonWalRecord::Postgres { rec, .. }) => rec.len(),
|
||||
_ => std::mem::size_of::<NeonWalRecord>(),
|
||||
}
|
||||
}
|
||||
fn estimated_size_of_key() -> usize {
|
||||
KEY_SIZE // TODO: distinguish image layer and delta layer (count LSN in delta layer)
|
||||
}
|
||||
fn visit_delta_layer(&mut self, size: u64) {
|
||||
self.delta_layer_visited.num += 1;
|
||||
self.delta_layer_visited.size += size;
|
||||
}
|
||||
fn visit_image_layer(&mut self, size: u64) {
|
||||
self.image_layer_visited.num += 1;
|
||||
self.image_layer_visited.size += size;
|
||||
}
|
||||
fn on_unique_key_visited(&mut self) {
|
||||
self.num_unique_keys_visited += 1;
|
||||
}
|
||||
fn visit_wal_key(&mut self, val: &Value) {
|
||||
self.wal_keys_visited.num += 1;
|
||||
self.wal_keys_visited.size +=
|
||||
Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
|
||||
}
|
||||
fn visit_image_key(&mut self, val: &Value) {
|
||||
self.image_keys_visited.num += 1;
|
||||
self.image_keys_visited.size +=
|
||||
Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
|
||||
}
|
||||
fn produce_key(&mut self, val: &Value) {
|
||||
match val {
|
||||
Value::Image(img) => self.produce_image_key(img),
|
||||
Value::WalRecord(_) => self.produce_wal_key(val),
|
||||
}
|
||||
}
|
||||
fn produce_wal_key(&mut self, val: &Value) {
|
||||
self.wal_produced.num += 1;
|
||||
self.wal_produced.size +=
|
||||
Self::estimated_size_of_value(val) as u64 + Self::estimated_size_of_key() as u64;
|
||||
}
|
||||
fn produce_image_key(&mut self, val: &Bytes) {
|
||||
self.image_produced.num += 1;
|
||||
self.image_produced.size += val.len() as u64 + Self::estimated_size_of_key() as u64;
|
||||
}
|
||||
fn discard_delta_layer(&mut self) {
|
||||
self.num_delta_layer_discarded += 1;
|
||||
}
|
||||
fn discard_image_layer(&mut self) {
|
||||
self.num_image_layer_discarded += 1;
|
||||
}
|
||||
fn produce_delta_layer(&mut self, size: u64) {
|
||||
self.delta_layer_produced.num += 1;
|
||||
self.delta_layer_produced.size += size;
|
||||
}
|
||||
fn produce_image_layer(&mut self, size: u64) {
|
||||
self.image_layer_produced.num += 1;
|
||||
self.image_layer_produced.size += size;
|
||||
}
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
/// TODO: cancellation
|
||||
///
|
||||
@@ -118,12 +210,18 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, CompactionError> {
|
||||
if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
|
||||
self.compact_with_gc(cancel, ctx)
|
||||
self.compact_with_gc(cancel, flags, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
if flags.contains(CompactFlags::DryRun) {
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"dry-run mode is not supported for legacy compaction for now"
|
||||
)));
|
||||
}
|
||||
|
||||
// High level strategy for compaction / image creation:
|
||||
//
|
||||
// 1. First, calculate the desired "partitioning" of the
|
||||
@@ -1641,31 +1739,40 @@ impl Timeline {
|
||||
pub(crate) async fn compact_with_gc(
|
||||
self: &Arc<Self>,
|
||||
cancel: &CancellationToken,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
// Block other compaction/GC tasks from running for now. GC-compaction could run along
|
||||
// with legacy compaction tasks in the future.
|
||||
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
|
||||
// Note that we already acquired the compaction lock when the outer `compact` function gets called.
|
||||
|
||||
let _compaction_lock = tokio::select! {
|
||||
guard = self.compaction_lock.lock() => guard,
|
||||
// TODO: refactor to CompactionError to correctly pass cancelled error
|
||||
_ = cancel.cancelled() => return Err(anyhow!("cancelled")),
|
||||
let gc_lock = async {
|
||||
tokio::select! {
|
||||
guard = self.gc_lock.lock() => Ok(guard),
|
||||
// TODO: refactor to CompactionError to correctly pass cancelled error
|
||||
_ = cancel.cancelled() => Err(anyhow!("cancelled")),
|
||||
}
|
||||
};
|
||||
|
||||
let _gc = tokio::select! {
|
||||
guard = self.gc_lock.lock() => guard,
|
||||
// TODO: refactor to CompactionError to correctly pass cancelled error
|
||||
_ = cancel.cancelled() => return Err(anyhow!("cancelled")),
|
||||
};
|
||||
let gc_lock = crate::timed(
|
||||
gc_lock,
|
||||
"acquires gc lock",
|
||||
std::time::Duration::from_secs(5),
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!("running enhanced gc bottom-most compaction");
|
||||
let dry_run = flags.contains(CompactFlags::DryRun);
|
||||
|
||||
info!("running enhanced gc bottom-most compaction, dry_run={dry_run}");
|
||||
|
||||
scopeguard::defer! {
|
||||
info!("done enhanced gc bottom-most compaction");
|
||||
};
|
||||
|
||||
let mut stat = CompactionStatistics::default();
|
||||
|
||||
// Step 0: pick all delta layers + image layers below/intersect with the GC horizon.
|
||||
// The layer selection has the following properties:
|
||||
// 1. If a layer is in the selection, all layers below it are in the selection.
|
||||
@@ -1736,6 +1843,9 @@ impl Timeline {
|
||||
let key_range = desc.get_key_range();
|
||||
delta_split_points.insert(key_range.start);
|
||||
delta_split_points.insert(key_range.end);
|
||||
stat.visit_delta_layer(desc.file_size());
|
||||
} else {
|
||||
stat.visit_image_layer(desc.file_size());
|
||||
}
|
||||
}
|
||||
let mut delta_layers = Vec::new();
|
||||
@@ -1771,6 +1881,8 @@ impl Timeline {
|
||||
tline: &Arc<Timeline>,
|
||||
lowest_retain_lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
stats: &mut CompactionStatistics,
|
||||
dry_run: bool,
|
||||
last_batch: bool,
|
||||
) -> anyhow::Result<Option<FlushDeltaResult>> {
|
||||
// Check if we need to split the delta layer. We split at the original delta layer boundary to avoid
|
||||
@@ -1827,6 +1939,7 @@ impl Timeline {
|
||||
let layer_generation = guard.get_from_key(&delta_key).metadata().generation;
|
||||
drop(guard);
|
||||
if layer_generation == tline.generation {
|
||||
stats.discard_delta_layer();
|
||||
// TODO: depending on whether we design this compaction process to run along with
|
||||
// other compactions, there could be layer map modifications after we drop the
|
||||
// layer guard, and in case it creates duplicated layer key, we will still error
|
||||
@@ -1853,6 +1966,10 @@ impl Timeline {
|
||||
for (key, lsn, val) in deltas {
|
||||
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
|
||||
}
|
||||
stats.produce_delta_layer(delta_layer_writer.size());
|
||||
if dry_run {
|
||||
return Ok(None);
|
||||
}
|
||||
let delta_layer = delta_layer_writer
|
||||
.finish(delta_key.key_range.end, tline, ctx)
|
||||
.await?;
|
||||
@@ -1947,6 +2064,13 @@ impl Timeline {
|
||||
let mut current_delta_split_point = 0;
|
||||
let mut delta_layers = Vec::new();
|
||||
while let Some((key, lsn, val)) = merge_iter.next().await? {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(anyhow!("cancelled")); // TODO: refactor to CompactionError and pass cancel error
|
||||
}
|
||||
match val {
|
||||
Value::Image(_) => stat.visit_image_key(&val),
|
||||
Value::WalRecord(_) => stat.visit_wal_key(&val),
|
||||
}
|
||||
if last_key.is_none() || last_key.as_ref() == Some(&key) {
|
||||
if last_key.is_none() {
|
||||
last_key = Some(key);
|
||||
@@ -1954,6 +2078,7 @@ impl Timeline {
|
||||
accumulated_values.push((key, lsn, val));
|
||||
} else {
|
||||
let last_key = last_key.as_mut().unwrap();
|
||||
stat.on_unique_key_visited();
|
||||
let retention = self
|
||||
.generate_key_retention(
|
||||
*last_key,
|
||||
@@ -1970,6 +2095,7 @@ impl Timeline {
|
||||
*last_key,
|
||||
&mut delta_values,
|
||||
image_layer_writer.as_mut(),
|
||||
&mut stat,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -1982,6 +2108,8 @@ impl Timeline {
|
||||
self,
|
||||
lowest_retain_lsn,
|
||||
ctx,
|
||||
&mut stat,
|
||||
dry_run,
|
||||
false,
|
||||
)
|
||||
.await?,
|
||||
@@ -1994,6 +2122,7 @@ impl Timeline {
|
||||
|
||||
let last_key = last_key.expect("no keys produced during compaction");
|
||||
// TODO: move this part to the loop body
|
||||
stat.on_unique_key_visited();
|
||||
let retention = self
|
||||
.generate_key_retention(
|
||||
last_key,
|
||||
@@ -2010,6 +2139,7 @@ impl Timeline {
|
||||
last_key,
|
||||
&mut delta_values,
|
||||
image_layer_writer.as_mut(),
|
||||
&mut stat,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -2022,6 +2152,8 @@ impl Timeline {
|
||||
self,
|
||||
lowest_retain_lsn,
|
||||
ctx,
|
||||
&mut stat,
|
||||
dry_run,
|
||||
true,
|
||||
)
|
||||
.await?,
|
||||
@@ -2029,12 +2161,28 @@ impl Timeline {
|
||||
assert!(delta_values.is_empty(), "unprocessed keys");
|
||||
|
||||
let image_layer = if discard_image_layer {
|
||||
stat.discard_image_layer();
|
||||
None
|
||||
} else if let Some(writer) = image_layer_writer {
|
||||
Some(writer.finish(self, ctx).await?)
|
||||
stat.produce_image_layer(writer.size());
|
||||
if !dry_run {
|
||||
Some(writer.finish(self, ctx).await?)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
info!(
|
||||
"gc-compaction statistics: {}",
|
||||
serde_json::to_string(&stat)?
|
||||
);
|
||||
|
||||
if dry_run {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!(
|
||||
"produced {} delta layers and {} image layers",
|
||||
delta_layers.len(),
|
||||
@@ -2058,14 +2206,17 @@ impl Timeline {
|
||||
let mut layer_selection = layer_selection;
|
||||
layer_selection.retain(|x| !keep_layers.contains(&x.layer_desc().key()));
|
||||
compact_to.extend(image_layer);
|
||||
|
||||
// Step 3: Place back to the layer map.
|
||||
{
|
||||
let mut guard = self.layers.write().await;
|
||||
guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
|
||||
};
|
||||
|
||||
self.remote_client
|
||||
.schedule_compaction_update(&layer_selection, &compact_to)?;
|
||||
|
||||
drop(gc_lock);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,10 +63,19 @@ pub(super) async fn delete_local_timeline_directory(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline: &Timeline,
|
||||
) -> anyhow::Result<()> {
|
||||
let guards = async { tokio::join!(timeline.gc_lock.lock(), timeline.compaction_lock.lock()) };
|
||||
let guards = crate::timed(
|
||||
guards,
|
||||
"acquire gc and compaction locks",
|
||||
// Always ensure the lock order is compaction -> gc.
|
||||
let compaction_lock = timeline.compaction_lock.lock();
|
||||
let compaction_lock = crate::timed(
|
||||
compaction_lock,
|
||||
"acquires compaction lock",
|
||||
std::time::Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
|
||||
let gc_lock = timeline.gc_lock.lock();
|
||||
let gc_lock = crate::timed(
|
||||
gc_lock,
|
||||
"acquires gc lock",
|
||||
std::time::Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
@@ -107,7 +116,8 @@ pub(super) async fn delete_local_timeline_directory(
|
||||
.context("fsync_pre_mark_remove")?;
|
||||
|
||||
info!("finished deleting layer files, releasing locks");
|
||||
drop(guards);
|
||||
drop(gc_lock);
|
||||
drop(compaction_lock);
|
||||
|
||||
fail::fail_point!("timeline-delete-after-rm", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
|
||||
@@ -220,6 +230,8 @@ impl DeleteTimelineFlow {
|
||||
// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
|
||||
timeline.shutdown(super::ShutdownMode::Hard).await;
|
||||
|
||||
tenant.gc_block.before_delete(&timeline);
|
||||
|
||||
fail::fail_point!("timeline-delete-before-index-deleted-at", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: timeline-delete-before-index-deleted-at"
|
||||
|
||||
@@ -122,6 +122,10 @@ impl CurrentLogicalSize {
|
||||
Self::Exact(_) => Accuracy::Exact,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_exact(&self) -> bool {
|
||||
matches!(self, Self::Exact(_))
|
||||
}
|
||||
}
|
||||
|
||||
impl LogicalSize {
|
||||
|
||||
@@ -92,7 +92,6 @@ impl WalIngest {
|
||||
decoded: &mut DecodedWALRecord,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<bool> {
|
||||
eprintln!("ingest_record @ {lsn}");
|
||||
WAL_INGEST.records_received.inc();
|
||||
let pg_version = modification.tline.pg_version;
|
||||
let prev_len = modification.len();
|
||||
|
||||
16
poetry.lock
generated
16
poetry.lock
generated
@@ -1514,6 +1514,20 @@ files = [
|
||||
[package.dependencies]
|
||||
six = "*"
|
||||
|
||||
[[package]]
|
||||
name = "kafka-python"
|
||||
version = "2.0.2"
|
||||
description = "Pure Python client for Apache Kafka"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "kafka-python-2.0.2.tar.gz", hash = "sha256:04dfe7fea2b63726cd6f3e79a2d86e709d608d74406638c5da33a01d45a9d7e3"},
|
||||
{file = "kafka_python-2.0.2-py2.py3-none-any.whl", hash = "sha256:2d92418c7cb1c298fa6c7f0fb3519b520d0d7526ac6cb7ae2a4fc65a51a94b6e"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
crc32c = ["crc32c"]
|
||||
|
||||
[[package]]
|
||||
name = "lazy-object-proxy"
|
||||
version = "1.10.0"
|
||||
@@ -3357,4 +3371,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.9"
|
||||
content-hash = "7cee6a8c30bc7f4bfb0a87c6bad3952dfb4da127fad853d2710a93ac3eab8a00"
|
||||
content-hash = "d569a3593b98baceb0a88e176bdad63cae99d6bfc2a81bf6741663a4abcafd72"
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
[tool.poetry]
|
||||
name = "neon"
|
||||
version = "0.1.0"
|
||||
description = ""
|
||||
authors = []
|
||||
package-mode = false
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.9"
|
||||
@@ -42,6 +41,7 @@ httpx = {extras = ["http2"], version = "^0.26.0"}
|
||||
pytest-repeat = "^0.9.3"
|
||||
websockets = "^12.0"
|
||||
clickhouse-connect = "^0.7.16"
|
||||
kafka-python = "^2.0.2"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
mypy = "==1.3.0"
|
||||
@@ -75,6 +75,7 @@ module = [
|
||||
"allure.*",
|
||||
"allure_commons.*",
|
||||
"allure_pytest.*",
|
||||
"kafka.*",
|
||||
]
|
||||
ignore_missing_imports = true
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ use clap::{ArgAction, Parser};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use sd_notify::NotifyState;
|
||||
use tokio::runtime::Handle;
|
||||
@@ -170,11 +171,6 @@ struct Args {
|
||||
/// still needed for existing replication connection.
|
||||
#[arg(long)]
|
||||
walsenders_keep_horizon: bool,
|
||||
/// Enable partial backup. If disabled, safekeeper will not upload partial
|
||||
/// segments to remote storage.
|
||||
/// TODO: now partial backup is always enabled, remove this flag.
|
||||
#[arg(long)]
|
||||
partial_backup_enabled: bool,
|
||||
/// Controls how long backup will wait until uploading the partial segment.
|
||||
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_PARTIAL_BACKUP_TIMEOUT, verbatim_doc_comment)]
|
||||
partial_backup_timeout: Duration,
|
||||
@@ -209,6 +205,7 @@ fn opt_pathbuf_parser(s: &str) -> Result<Utf8PathBuf, String> {
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
|
||||
// We want to allow multiple occurences of the same arg (taking the last) so
|
||||
// that neon_local could generate command with defaults + overrides without
|
||||
// getting 'argument cannot be used multiple times' error. This seems to be
|
||||
@@ -347,7 +344,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
sk_auth_token,
|
||||
current_thread_runtime: args.current_thread_runtime,
|
||||
walsenders_keep_horizon: args.walsenders_keep_horizon,
|
||||
partial_backup_enabled: true,
|
||||
partial_backup_timeout: args.partial_backup_timeout,
|
||||
disable_periodic_broker_push: args.disable_periodic_broker_push,
|
||||
enable_offload: args.enable_offload,
|
||||
@@ -362,14 +358,14 @@ async fn main() -> anyhow::Result<()> {
|
||||
Some(GIT_VERSION.into()),
|
||||
&[("node_id", &conf.my_id.to_string())],
|
||||
);
|
||||
start_safekeeper(conf).await
|
||||
start_safekeeper(launch_ts, conf).await
|
||||
}
|
||||
|
||||
/// Result of joining any of main tasks: upper error means task failed to
|
||||
/// complete, e.g. panicked, inner is error produced by task itself.
|
||||
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
|
||||
|
||||
async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
async fn start_safekeeper(launch_ts: &'static LaunchTimestamp, conf: SafeKeeperConf) -> Result<()> {
|
||||
// Prevent running multiple safekeepers on the same directory
|
||||
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
|
||||
let lock_file =
|
||||
@@ -497,6 +493,7 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
tasks_handles.push(Box::pin(broker_task_handle));
|
||||
|
||||
set_build_info_metric(GIT_VERSION, BUILD_TAG);
|
||||
set_launch_timestamp_metric(launch_ts);
|
||||
|
||||
// TODO: update tokio-stream, convert to real async Stream with
|
||||
// SignalStream, map it to obtain missing signal name, combine streams into
|
||||
|
||||
@@ -21,6 +21,7 @@ pub mod json_ctrl;
|
||||
pub mod metrics;
|
||||
pub mod patch_control_file;
|
||||
pub mod pull_timeline;
|
||||
pub mod rate_limit;
|
||||
pub mod receive_wal;
|
||||
pub mod recovery;
|
||||
pub mod remove_wal;
|
||||
@@ -53,6 +54,7 @@ pub mod defaults {
|
||||
pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
|
||||
pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s";
|
||||
pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5";
|
||||
pub const DEFAULT_EVICTION_CONCURRENCY: usize = 2;
|
||||
|
||||
// By default, our required residency before eviction is the same as the period that passes
|
||||
// before uploading a partial segment, so that in normal operation the eviction can happen
|
||||
@@ -91,7 +93,6 @@ pub struct SafeKeeperConf {
|
||||
pub sk_auth_token: Option<SecretString>,
|
||||
pub current_thread_runtime: bool,
|
||||
pub walsenders_keep_horizon: bool,
|
||||
pub partial_backup_enabled: bool,
|
||||
pub partial_backup_timeout: Duration,
|
||||
pub disable_periodic_broker_push: bool,
|
||||
pub enable_offload: bool,
|
||||
@@ -135,7 +136,6 @@ impl SafeKeeperConf {
|
||||
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
current_thread_runtime: false,
|
||||
walsenders_keep_horizon: false,
|
||||
partial_backup_enabled: false,
|
||||
partial_backup_timeout: Duration::from_secs(0),
|
||||
disable_periodic_broker_push: false,
|
||||
enable_offload: false,
|
||||
|
||||
49
safekeeper/src/rate_limit.rs
Normal file
49
safekeeper/src/rate_limit.rs
Normal file
@@ -0,0 +1,49 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use rand::Rng;
|
||||
|
||||
use crate::metrics::MISC_OPERATION_SECONDS;
|
||||
|
||||
/// Global rate limiter for background tasks.
|
||||
#[derive(Clone)]
|
||||
pub struct RateLimiter {
|
||||
partial_backup: Arc<tokio::sync::Semaphore>,
|
||||
eviction: Arc<tokio::sync::Semaphore>,
|
||||
}
|
||||
|
||||
impl RateLimiter {
|
||||
/// Create a new rate limiter.
|
||||
/// - `partial_backup_max`: maximum number of concurrent partial backups.
|
||||
/// - `eviction_max`: maximum number of concurrent timeline evictions.
|
||||
pub fn new(partial_backup_max: usize, eviction_max: usize) -> Self {
|
||||
Self {
|
||||
partial_backup: Arc::new(tokio::sync::Semaphore::new(partial_backup_max)),
|
||||
eviction: Arc::new(tokio::sync::Semaphore::new(eviction_max)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a permit for partial backup. This will block if the maximum number of concurrent
|
||||
/// partial backups is reached.
|
||||
pub async fn acquire_partial_backup(&self) -> tokio::sync::OwnedSemaphorePermit {
|
||||
let _timer = MISC_OPERATION_SECONDS
|
||||
.with_label_values(&["partial_permit_acquire"])
|
||||
.start_timer();
|
||||
self.partial_backup
|
||||
.clone()
|
||||
.acquire_owned()
|
||||
.await
|
||||
.expect("semaphore is closed")
|
||||
}
|
||||
|
||||
/// Try to get a permit for timeline eviction. This will return None if the maximum number of
|
||||
/// concurrent timeline evictions is reached.
|
||||
pub fn try_acquire_eviction(&self) -> Option<tokio::sync::OwnedSemaphorePermit> {
|
||||
self.eviction.clone().try_acquire_owned().ok()
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate a random duration that is a fraction of the given duration.
|
||||
pub fn rand_duration(duration: &std::time::Duration) -> std::time::Duration {
|
||||
let randf64 = rand::thread_rng().gen_range(0.0..1.0);
|
||||
duration.mul_f64(randf64)
|
||||
}
|
||||
@@ -25,6 +25,7 @@ use utils::{
|
||||
use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
|
||||
use crate::rate_limit::RateLimiter;
|
||||
use crate::receive_wal::WalReceivers;
|
||||
use crate::safekeeper::{
|
||||
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn,
|
||||
@@ -36,7 +37,7 @@ use crate::timeline_guard::ResidenceGuard;
|
||||
use crate::timeline_manager::{AtomicStatus, ManagerCtl};
|
||||
use crate::timelines_set::TimelinesSet;
|
||||
use crate::wal_backup::{self};
|
||||
use crate::wal_backup_partial::{PartialRemoteSegment, RateLimiter};
|
||||
use crate::wal_backup_partial::PartialRemoteSegment;
|
||||
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
|
||||
|
||||
use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
use remote_storage::RemotePath;
|
||||
use std::time::Instant;
|
||||
use tokio::{
|
||||
fs::File,
|
||||
io::{AsyncRead, AsyncWriteExt},
|
||||
@@ -15,6 +14,7 @@ use utils::crashsafe::durable_rename;
|
||||
|
||||
use crate::{
|
||||
metrics::{EvictionEvent, EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED},
|
||||
rate_limit::rand_duration,
|
||||
timeline_manager::{Manager, StateSnapshot},
|
||||
wal_backup,
|
||||
wal_backup_partial::{self, PartialRemoteSegment},
|
||||
@@ -50,7 +50,6 @@ impl Manager {
|
||||
.flush_lsn
|
||||
.segment_number(self.wal_seg_size)
|
||||
== self.last_removed_segno + 1
|
||||
&& self.resident_since.elapsed() >= self.conf.eviction_min_resident
|
||||
}
|
||||
|
||||
/// Evict the timeline to remote storage.
|
||||
@@ -112,7 +111,8 @@ impl Manager {
|
||||
return;
|
||||
}
|
||||
|
||||
self.resident_since = Instant::now();
|
||||
self.evict_not_before =
|
||||
tokio::time::Instant::now() + rand_duration(&self.conf.eviction_min_resident);
|
||||
|
||||
info!("successfully restored evicted timeline");
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ use utils::lsn::Lsn;
|
||||
use crate::{
|
||||
control_file::{FileStorage, Storage},
|
||||
metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS},
|
||||
rate_limit::{rand_duration, RateLimiter},
|
||||
recovery::recovery_main,
|
||||
remove_wal::calc_horizon_lsn,
|
||||
safekeeper::Term,
|
||||
@@ -32,7 +33,7 @@ use crate::{
|
||||
timeline_guard::{AccessService, GuardId, ResidenceGuard},
|
||||
timelines_set::{TimelineSetGuard, TimelinesSet},
|
||||
wal_backup::{self, WalBackupTaskHandle},
|
||||
wal_backup_partial::{self, PartialRemoteSegment, RateLimiter},
|
||||
wal_backup_partial::{self, PartialRemoteSegment},
|
||||
SafeKeeperConf,
|
||||
};
|
||||
|
||||
@@ -185,11 +186,11 @@ pub(crate) struct Manager {
|
||||
|
||||
// misc
|
||||
pub(crate) access_service: AccessService,
|
||||
pub(crate) partial_backup_rate_limiter: RateLimiter,
|
||||
pub(crate) global_rate_limiter: RateLimiter,
|
||||
|
||||
// Anti-flapping state: we evict timelines eagerly if they are inactive, but should not
|
||||
// evict them if they go inactive very soon after being restored.
|
||||
pub(crate) resident_since: std::time::Instant,
|
||||
pub(crate) evict_not_before: Instant,
|
||||
}
|
||||
|
||||
/// This task gets spawned alongside each timeline and is responsible for managing the timeline's
|
||||
@@ -202,7 +203,7 @@ pub async fn main_task(
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
|
||||
mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
|
||||
partial_backup_rate_limiter: RateLimiter,
|
||||
global_rate_limiter: RateLimiter,
|
||||
) {
|
||||
tli.set_status(Status::Started);
|
||||
|
||||
@@ -220,7 +221,7 @@ pub async fn main_task(
|
||||
conf,
|
||||
broker_active_set,
|
||||
manager_tx,
|
||||
partial_backup_rate_limiter,
|
||||
global_rate_limiter,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -254,9 +255,29 @@ pub async fn main_task(
|
||||
mgr.set_status(Status::UpdatePartialBackup);
|
||||
mgr.update_partial_backup(&state_snapshot).await;
|
||||
|
||||
if mgr.conf.enable_offload && mgr.ready_for_eviction(&next_event, &state_snapshot) {
|
||||
mgr.set_status(Status::EvictTimeline);
|
||||
mgr.evict_timeline().await;
|
||||
let now = Instant::now();
|
||||
if mgr.evict_not_before > now {
|
||||
// we should wait until evict_not_before
|
||||
update_next_event(&mut next_event, mgr.evict_not_before);
|
||||
}
|
||||
|
||||
if mgr.conf.enable_offload
|
||||
&& mgr.evict_not_before <= now
|
||||
&& mgr.ready_for_eviction(&next_event, &state_snapshot)
|
||||
{
|
||||
// check rate limiter and evict timeline if possible
|
||||
match mgr.global_rate_limiter.try_acquire_eviction() {
|
||||
Some(_permit) => {
|
||||
mgr.set_status(Status::EvictTimeline);
|
||||
mgr.evict_timeline().await;
|
||||
}
|
||||
None => {
|
||||
// we can't evict timeline now, will try again later
|
||||
mgr.evict_not_before =
|
||||
Instant::now() + rand_duration(&mgr.conf.eviction_min_resident);
|
||||
update_next_event(&mut next_event, mgr.evict_not_before);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -334,11 +355,10 @@ impl Manager {
|
||||
conf: SafeKeeperConf,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
|
||||
partial_backup_rate_limiter: RateLimiter,
|
||||
global_rate_limiter: RateLimiter,
|
||||
) -> Manager {
|
||||
let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
|
||||
Manager {
|
||||
conf,
|
||||
wal_seg_size: tli.get_wal_seg_size().await,
|
||||
walsenders: tli.get_walsenders().clone(),
|
||||
state_version_rx: tli.get_state_version_rx(),
|
||||
@@ -353,8 +373,10 @@ impl Manager {
|
||||
partial_backup_uploaded,
|
||||
access_service: AccessService::new(manager_tx),
|
||||
tli,
|
||||
partial_backup_rate_limiter,
|
||||
resident_since: std::time::Instant::now(),
|
||||
global_rate_limiter,
|
||||
// to smooth out evictions spike after restart
|
||||
evict_not_before: Instant::now() + rand_duration(&conf.eviction_min_resident),
|
||||
conf,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -522,8 +544,8 @@ impl Manager {
|
||||
|
||||
/// Spawns partial WAL backup task if needed.
|
||||
async fn update_partial_backup(&mut self, state: &StateSnapshot) {
|
||||
// check if partial backup is enabled and should be started
|
||||
if !self.conf.is_wal_backup_enabled() || !self.conf.partial_backup_enabled {
|
||||
// check if WAL backup is enabled and should be started
|
||||
if !self.conf.is_wal_backup_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -541,7 +563,7 @@ impl Manager {
|
||||
self.partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task(
|
||||
self.wal_resident_timeline(),
|
||||
self.conf.clone(),
|
||||
self.partial_backup_rate_limiter.clone(),
|
||||
self.global_rate_limiter.clone(),
|
||||
)));
|
||||
}
|
||||
|
||||
|
||||
@@ -2,10 +2,11 @@
|
||||
//! All timelines should always be present in this map, this is done by loading them
|
||||
//! all from the disk on startup and keeping them in memory.
|
||||
|
||||
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
|
||||
use crate::rate_limit::RateLimiter;
|
||||
use crate::safekeeper::ServerInfo;
|
||||
use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
|
||||
use crate::timelines_set::TimelinesSet;
|
||||
use crate::wal_backup_partial::RateLimiter;
|
||||
use crate::SafeKeeperConf;
|
||||
use anyhow::{bail, Context, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
@@ -31,7 +32,7 @@ struct GlobalTimelinesState {
|
||||
conf: Option<SafeKeeperConf>,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
|
||||
partial_backup_rate_limiter: RateLimiter,
|
||||
global_rate_limiter: RateLimiter,
|
||||
}
|
||||
|
||||
// Used to prevent concurrent timeline loading.
|
||||
@@ -50,7 +51,7 @@ impl GlobalTimelinesState {
|
||||
(
|
||||
self.get_conf().clone(),
|
||||
self.broker_active_set.clone(),
|
||||
self.partial_backup_rate_limiter.clone(),
|
||||
self.global_rate_limiter.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -85,7 +86,7 @@ static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
|
||||
conf: None,
|
||||
broker_active_set: Arc::new(TimelinesSet::default()),
|
||||
load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
|
||||
partial_backup_rate_limiter: RateLimiter::new(1),
|
||||
global_rate_limiter: RateLimiter::new(1, 1),
|
||||
})
|
||||
});
|
||||
|
||||
@@ -99,7 +100,10 @@ impl GlobalTimelines {
|
||||
// lock, so use explicit block
|
||||
let tenants_dir = {
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
state.partial_backup_rate_limiter = RateLimiter::new(conf.partial_backup_concurrency);
|
||||
state.global_rate_limiter = RateLimiter::new(
|
||||
conf.partial_backup_concurrency,
|
||||
DEFAULT_EVICTION_CONCURRENCY,
|
||||
);
|
||||
state.conf = Some(conf);
|
||||
|
||||
// Iterate through all directories and load tenants for all directories
|
||||
|
||||
@@ -18,8 +18,6 @@
|
||||
//! This way control file stores information about all potentially existing
|
||||
//! remote partial segments and can clean them up after uploading a newer version.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use camino::Utf8PathBuf;
|
||||
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
|
||||
use remote_storage::RemotePath;
|
||||
@@ -30,6 +28,7 @@ use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
|
||||
rate_limit::{rand_duration, RateLimiter},
|
||||
safekeeper::Term,
|
||||
timeline::WalResidentTimeline,
|
||||
timeline_manager::StateSnapshot,
|
||||
@@ -37,30 +36,6 @@ use crate::{
|
||||
SafeKeeperConf,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RateLimiter {
|
||||
semaphore: Arc<tokio::sync::Semaphore>,
|
||||
}
|
||||
|
||||
impl RateLimiter {
|
||||
pub fn new(permits: usize) -> Self {
|
||||
Self {
|
||||
semaphore: Arc::new(tokio::sync::Semaphore::new(permits)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn acquire_owned(&self) -> tokio::sync::OwnedSemaphorePermit {
|
||||
let _timer = MISC_OPERATION_SECONDS
|
||||
.with_label_values(&["partial_permit_acquire"])
|
||||
.start_timer();
|
||||
self.semaphore
|
||||
.clone()
|
||||
.acquire_owned()
|
||||
.await
|
||||
.expect("semaphore is closed")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub enum UploadStatus {
|
||||
/// Upload is in progress. This status should be used only for garbage collection,
|
||||
@@ -352,6 +327,7 @@ pub async fn main_task(
|
||||
) -> Option<PartialRemoteSegment> {
|
||||
debug!("started");
|
||||
let await_duration = conf.partial_backup_timeout;
|
||||
let mut first_iteration = true;
|
||||
|
||||
let (_, persistent_state) = tli.get_state().await;
|
||||
let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
|
||||
@@ -419,6 +395,15 @@ pub async fn main_task(
|
||||
}
|
||||
}
|
||||
|
||||
// smoothing the load after restart, by sleeping for a random time.
|
||||
// if this is not the first iteration, we will wait for the full await_duration
|
||||
let await_duration = if first_iteration {
|
||||
first_iteration = false;
|
||||
rand_duration(&await_duration)
|
||||
} else {
|
||||
await_duration
|
||||
};
|
||||
|
||||
// fixing the segno and waiting some time to prevent reuploading the same segment too often
|
||||
let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn);
|
||||
let timeout = tokio::time::sleep(await_duration);
|
||||
@@ -454,7 +439,7 @@ pub async fn main_task(
|
||||
}
|
||||
|
||||
// limit concurrent uploads
|
||||
let _upload_permit = limiter.acquire_owned().await;
|
||||
let _upload_permit = limiter.acquire_partial_backup().await;
|
||||
|
||||
let prepared = backup.prepare_upload().await;
|
||||
if let Some(seg) = &uploaded_segment {
|
||||
|
||||
@@ -181,7 +181,6 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
sk_auth_token: None,
|
||||
current_thread_runtime: false,
|
||||
walsenders_keep_horizon: false,
|
||||
partial_backup_enabled: false,
|
||||
partial_backup_timeout: Duration::from_secs(0),
|
||||
disable_periodic_broker_push: false,
|
||||
enable_offload: false,
|
||||
|
||||
@@ -32,6 +32,7 @@ once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest = { workspace = true, features = ["stream"] }
|
||||
routerify.workspace = true
|
||||
serde.workspace = true
|
||||
|
||||
@@ -9,12 +9,14 @@ use std::time::Duration;
|
||||
use storage_controller::http::make_router;
|
||||
use storage_controller::metrics::preinitialize_metrics;
|
||||
use storage_controller::persistence::Persistence;
|
||||
use storage_controller::service::chaos_injector::ChaosInjector;
|
||||
use storage_controller::service::{
|
||||
Config, Service, MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT,
|
||||
RECONCILER_CONCURRENCY_DEFAULT,
|
||||
};
|
||||
use tokio::signal::unix::SignalKind;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
use utils::auth::{JwtAuth, SwappableJwtAuth};
|
||||
use utils::logging::{self, LogFormat};
|
||||
|
||||
@@ -86,6 +88,10 @@ struct Cli {
|
||||
// TODO: make `cfg(feature = "testing")`
|
||||
#[arg(long)]
|
||||
neon_local_repo_dir: Option<PathBuf>,
|
||||
|
||||
/// Chaos testing
|
||||
#[arg(long)]
|
||||
chaos_interval: Option<humantime::Duration>,
|
||||
}
|
||||
|
||||
enum StrictMode {
|
||||
@@ -309,6 +315,22 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
tracing::info!("Serving on {0}", args.listen);
|
||||
let server_task = tokio::task::spawn(server);
|
||||
|
||||
let chaos_task = args.chaos_interval.map(|interval| {
|
||||
let service = service.clone();
|
||||
let cancel = CancellationToken::new();
|
||||
let cancel_bg = cancel.clone();
|
||||
(
|
||||
tokio::task::spawn(
|
||||
async move {
|
||||
let mut chaos_injector = ChaosInjector::new(service, interval.into());
|
||||
chaos_injector.run(cancel_bg).await
|
||||
}
|
||||
.instrument(tracing::info_span!("chaos_injector")),
|
||||
),
|
||||
cancel,
|
||||
)
|
||||
});
|
||||
|
||||
// Wait until we receive a signal
|
||||
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
|
||||
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?;
|
||||
@@ -337,6 +359,12 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
// If we were injecting chaos, stop that so that we're not calling into Service while it shuts down
|
||||
if let Some((chaos_jh, chaos_cancel)) = chaos_task {
|
||||
chaos_cancel.cancel();
|
||||
chaos_jh.await.ok();
|
||||
}
|
||||
|
||||
service.shutdown().await;
|
||||
tracing::info!("Service shutdown complete");
|
||||
|
||||
|
||||
@@ -84,6 +84,8 @@ use crate::{
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub mod chaos_injector;
|
||||
|
||||
// For operations that should be quick, like attaching a new tenant
|
||||
const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
|
||||
71
storage_controller/src/service/chaos_injector.rs
Normal file
71
storage_controller/src/service/chaos_injector.rs
Normal file
@@ -0,0 +1,71 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use super::Service;
|
||||
|
||||
pub struct ChaosInjector {
|
||||
service: Arc<Service>,
|
||||
interval: Duration,
|
||||
}
|
||||
|
||||
impl ChaosInjector {
|
||||
pub fn new(service: Arc<Service>, interval: Duration) -> Self {
|
||||
Self { service, interval }
|
||||
}
|
||||
|
||||
pub async fn run(&mut self, cancel: CancellationToken) {
|
||||
let mut interval = tokio::time::interval(self.interval);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => {}
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("Shutting down");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
self.inject_chaos().await;
|
||||
|
||||
tracing::info!("Chaos iteration...");
|
||||
}
|
||||
}
|
||||
|
||||
async fn inject_chaos(&mut self) {
|
||||
// Pick some shards to interfere with
|
||||
let batch_size = 128;
|
||||
let mut inner = self.service.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = inner.parts_mut();
|
||||
let tenant_ids = tenants.keys().cloned().collect::<Vec<_>>();
|
||||
let victims = tenant_ids.choose_multiple(&mut thread_rng(), batch_size);
|
||||
|
||||
for victim in victims {
|
||||
let shard = tenants
|
||||
.get_mut(victim)
|
||||
.expect("Held lock between choosing ID and this get");
|
||||
|
||||
// Pick a secondary to promote
|
||||
let Some(new_location) = shard
|
||||
.intent
|
||||
.get_secondary()
|
||||
.choose(&mut thread_rng())
|
||||
.cloned()
|
||||
else {
|
||||
tracing::info!("Skipping shard {victim}: no secondary location, can't migrate");
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(old_location) = *shard.intent.get_attached() else {
|
||||
tracing::info!("Skipping shard {victim}: currently has no attached location");
|
||||
continue;
|
||||
};
|
||||
|
||||
shard.intent.demote_attached(scheduler, old_location);
|
||||
shard.intent.promote_attached(scheduler, new_location);
|
||||
self.service.maybe_reconcile_shard(shard, nodes);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -92,7 +92,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
.push(format!("index_part.json version: {}", index_part.version()))
|
||||
}
|
||||
|
||||
let mut newest_versions = IndexPart::KNOWN_VERSIONS.iter().rev().take(2);
|
||||
let mut newest_versions = IndexPart::KNOWN_VERSIONS.iter().rev().take(3);
|
||||
if !newest_versions.any(|ip| ip == &index_part.version()) {
|
||||
info!(
|
||||
"index_part.json version is not latest: {}",
|
||||
|
||||
@@ -4,7 +4,7 @@ use anyhow::{anyhow, Context};
|
||||
use async_stream::{stream, try_stream};
|
||||
use aws_sdk_s3::{types::ObjectIdentifier, Client};
|
||||
use futures::StreamExt;
|
||||
use remote_storage::{GenericRemoteStorage, ListingMode};
|
||||
use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath};
|
||||
use tokio_stream::Stream;
|
||||
|
||||
use crate::{
|
||||
@@ -276,3 +276,33 @@ pub(crate) fn stream_listing<'a>(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn stream_listing_generic<'a>(
|
||||
remote_client: &'a GenericRemoteStorage,
|
||||
target: &'a S3Target,
|
||||
) -> impl Stream<Item = anyhow::Result<(RemotePath, Option<ListingObject>)>> + 'a {
|
||||
let listing_mode = if target.delimiter.is_empty() {
|
||||
ListingMode::NoDelimiter
|
||||
} else {
|
||||
ListingMode::WithDelimiter
|
||||
};
|
||||
try_stream! {
|
||||
let mut objects_stream = std::pin::pin!(stream_objects_with_retries(
|
||||
remote_client,
|
||||
listing_mode,
|
||||
target,
|
||||
));
|
||||
while let Some(list) = objects_stream.next().await {
|
||||
let list = list?;
|
||||
if target.delimiter.is_empty() {
|
||||
for key in list.keys {
|
||||
yield (key.key.clone(), Some(key));
|
||||
}
|
||||
} else {
|
||||
for key in list.prefixes {
|
||||
yield (key, None);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use std::{collections::HashSet, str::FromStr, sync::Arc};
|
||||
|
||||
use aws_sdk_s3::Client;
|
||||
use futures::stream::{StreamExt, TryStreamExt};
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_ffi::{XLogFileName, PG_TLI};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use serde::Serialize;
|
||||
use tokio_postgres::types::PgLsn;
|
||||
use tracing::{error, info, trace};
|
||||
@@ -14,8 +14,9 @@ use utils::{
|
||||
};
|
||||
|
||||
use crate::{
|
||||
cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
|
||||
BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
|
||||
cloud_admin_api::CloudAdminApiClient, init_remote_generic,
|
||||
metadata_stream::stream_listing_generic, BucketConfig, ConsoleConfig, NodeKind, RootTarget,
|
||||
TenantShardTimelineId,
|
||||
};
|
||||
|
||||
/// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
|
||||
@@ -106,7 +107,7 @@ pub async fn scan_safekeeper_metadata(
|
||||
let timelines = client.query(&query, &[]).await?;
|
||||
info!("loaded {} timelines", timelines.len());
|
||||
|
||||
let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
|
||||
let (remote_client, target) = init_remote_generic(bucket_config, NodeKind::Safekeeper).await?;
|
||||
let console_config = ConsoleConfig::from_env()?;
|
||||
let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
|
||||
|
||||
@@ -119,7 +120,7 @@ pub async fn scan_safekeeper_metadata(
|
||||
let backup_lsn: Lsn = Lsn(u64::from(backup_lsn_pg));
|
||||
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
check_timeline(
|
||||
&s3_client,
|
||||
&remote_client,
|
||||
&target,
|
||||
&cloud_admin_api_client,
|
||||
ttid,
|
||||
@@ -156,7 +157,7 @@ struct TimelineCheckResult {
|
||||
/// errors are logged to stderr; returns Ok(true) if timeline is consistent,
|
||||
/// Ok(false) if not, Err if failed to check.
|
||||
async fn check_timeline(
|
||||
s3_client: &Client,
|
||||
remote_client: &GenericRemoteStorage,
|
||||
root: &RootTarget,
|
||||
api_client: &CloudAdminApiClient,
|
||||
ttid: TenantTimelineId,
|
||||
@@ -187,12 +188,13 @@ async fn check_timeline(
|
||||
// we need files, so unset it.
|
||||
timeline_dir_target.delimiter = String::new();
|
||||
|
||||
let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target));
|
||||
let mut stream = std::pin::pin!(stream_listing_generic(remote_client, &timeline_dir_target));
|
||||
while let Some(obj) = stream.next().await {
|
||||
let obj = obj?;
|
||||
let key = obj.key();
|
||||
let (key, _obj) = obj?;
|
||||
|
||||
let seg_name = key
|
||||
.get_path()
|
||||
.as_str()
|
||||
.strip_prefix(&timeline_dir_target.prefix_in_bucket)
|
||||
.expect("failed to extract segment name");
|
||||
expected_segfiles.remove(seg_name);
|
||||
|
||||
@@ -285,9 +285,9 @@ class NeonApiEndpoint:
|
||||
self.project_id = project_id
|
||||
eps = neon_api.get_endpoints(project_id)["endpoints"]
|
||||
self.endpoint_id = eps[0]["id"]
|
||||
self.connstr = neon_api.get_connection_uri(project_id, endpoint_id=self.endpoint_id)[
|
||||
"uri"
|
||||
]
|
||||
self.connstr = neon_api.get_connection_uri(
|
||||
project_id, endpoint_id=self.endpoint_id, pooled=False
|
||||
)["uri"]
|
||||
pw = self.connstr.split("@")[0].split(":")[-1]
|
||||
self.pgbench_env = {
|
||||
"PGHOST": eps[0]["host"],
|
||||
|
||||
@@ -1943,11 +1943,15 @@ class NeonCli(AbstractNeonCli):
|
||||
remote_ext_config: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
allow_multiple=False,
|
||||
basebackup_request_tries: Optional[int] = None,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"endpoint",
|
||||
"start",
|
||||
]
|
||||
extra_env_vars = {}
|
||||
if basebackup_request_tries is not None:
|
||||
extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries)
|
||||
if remote_ext_config is not None:
|
||||
args.extend(["--remote-ext-config", remote_ext_config])
|
||||
|
||||
@@ -1960,7 +1964,7 @@ class NeonCli(AbstractNeonCli):
|
||||
if allow_multiple:
|
||||
args.extend(["--allow-multiple"])
|
||||
|
||||
res = self.raw_cli(args)
|
||||
res = self.raw_cli(args, extra_env_vars)
|
||||
res.check_returncode()
|
||||
return res
|
||||
|
||||
@@ -3812,6 +3816,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
pageserver_id: Optional[int] = None,
|
||||
safekeepers: Optional[List[int]] = None,
|
||||
allow_multiple: bool = False,
|
||||
basebackup_request_tries: Optional[int] = None,
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
Start the Postgres instance.
|
||||
@@ -3833,6 +3838,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
basebackup_request_tries=basebackup_request_tries,
|
||||
)
|
||||
self._running.release(1)
|
||||
|
||||
@@ -3979,6 +3985,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
remote_ext_config: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
allow_multiple=False,
|
||||
basebackup_request_tries: Optional[int] = None,
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
Create an endpoint, apply config, and start Postgres.
|
||||
@@ -3999,6 +4006,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
basebackup_request_tries=basebackup_request_tries,
|
||||
)
|
||||
|
||||
log.info(f"Postgres startup took {time.time() - started_at} seconds")
|
||||
@@ -4042,6 +4050,7 @@ class EndpointFactory:
|
||||
config_lines: Optional[List[str]] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
basebackup_request_tries: Optional[int] = None,
|
||||
) -> Endpoint:
|
||||
ep = Endpoint(
|
||||
self.env,
|
||||
@@ -4060,6 +4069,7 @@ class EndpointFactory:
|
||||
lsn=lsn,
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
basebackup_request_tries=basebackup_request_tries,
|
||||
)
|
||||
|
||||
def create(
|
||||
|
||||
@@ -556,6 +556,22 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def timeline_block_gc(self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/block_gc",
|
||||
)
|
||||
log.info(f"Got GC request response code: {res.status_code}")
|
||||
self.verbose_error(res)
|
||||
|
||||
def timeline_unblock_gc(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/unblock_gc",
|
||||
)
|
||||
log.info(f"Got GC request response code: {res.status_code}")
|
||||
self.verbose_error(res)
|
||||
|
||||
def timeline_compact(
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
@@ -663,6 +679,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
force_image_layer_creation=False,
|
||||
wait_until_uploaded=False,
|
||||
compact: Optional[bool] = None,
|
||||
**kwargs,
|
||||
):
|
||||
self.is_testing_enabled_or_skip()
|
||||
query = {}
|
||||
@@ -680,6 +697,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
|
||||
params=query,
|
||||
**kwargs,
|
||||
)
|
||||
log.info(f"Got checkpoint request response code: {res.status_code}")
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -389,7 +389,10 @@ WaitUntilRet = TypeVar("WaitUntilRet")
|
||||
|
||||
|
||||
def wait_until(
|
||||
number_of_iterations: int, interval: float, func: Callable[[], WaitUntilRet]
|
||||
number_of_iterations: int,
|
||||
interval: float,
|
||||
func: Callable[[], WaitUntilRet],
|
||||
show_intermediate_error=False,
|
||||
) -> WaitUntilRet:
|
||||
"""
|
||||
Wait until 'func' returns successfully, without exception. Returns the
|
||||
@@ -402,6 +405,8 @@ def wait_until(
|
||||
except Exception as e:
|
||||
log.info("waiting for %s iteration %s failed", func, i + 1)
|
||||
last_exception = e
|
||||
if show_intermediate_error:
|
||||
log.info(e)
|
||||
time.sleep(interval)
|
||||
continue
|
||||
return res
|
||||
|
||||
22
test_runner/logical_repl/README.md
Normal file
22
test_runner/logical_repl/README.md
Normal file
@@ -0,0 +1,22 @@
|
||||
# Logical replication tests
|
||||
|
||||
## Clickhouse
|
||||
|
||||
```bash
|
||||
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
|
||||
|
||||
docker compose -f clickhouse/docker-compose.yml up -d
|
||||
pytest -m remote_cluster -k test_clickhouse
|
||||
docker compose -f clickhouse/docker-compose.yml down
|
||||
```
|
||||
|
||||
## Debezium
|
||||
|
||||
```bash
|
||||
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
|
||||
|
||||
docker compose -f debezium/docker-compose.yml up -d
|
||||
pytest -m remote_cluster -k test_debezium
|
||||
docker compose -f debezium/docker-compose.yml down
|
||||
|
||||
```
|
||||
9
test_runner/logical_repl/clickhouse/docker-compose.yml
Normal file
9
test_runner/logical_repl/clickhouse/docker-compose.yml
Normal file
@@ -0,0 +1,9 @@
|
||||
services:
|
||||
clickhouse:
|
||||
image: clickhouse/clickhouse-server
|
||||
user: "101:101"
|
||||
container_name: clickhouse
|
||||
hostname: clickhouse
|
||||
ports:
|
||||
- 127.0.0.1:8123:8123
|
||||
- 127.0.0.1:9000:9000
|
||||
24
test_runner/logical_repl/debezium/docker-compose.yml
Normal file
24
test_runner/logical_repl/debezium/docker-compose.yml
Normal file
@@ -0,0 +1,24 @@
|
||||
services:
|
||||
zookeeper:
|
||||
image: quay.io/debezium/zookeeper:2.7
|
||||
kafka:
|
||||
image: quay.io/debezium/kafka:2.7
|
||||
environment:
|
||||
ZOOKEEPER_CONNECT: "zookeeper:2181"
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
KAFKA_JMX_PORT: 9991
|
||||
ports:
|
||||
- 127.0.0.1:9092:9092
|
||||
debezium:
|
||||
image: quay.io/debezium/connect:2.7
|
||||
environment:
|
||||
BOOTSTRAP_SERVERS: kafka:9092
|
||||
GROUP_ID: 1
|
||||
CONFIG_STORAGE_TOPIC: debezium-config
|
||||
OFFSET_STORAGE_TOPIC: debezium-offset
|
||||
STATUS_STORAGE_TOPIC: debezium-status
|
||||
DEBEZIUM_CONFIG_CONNECTOR_CLASS: io.debezium.connector.postgresql.PostgresConnector
|
||||
ports:
|
||||
- 127.0.0.1:8083:8083
|
||||
@@ -1,8 +1,9 @@
|
||||
"""
|
||||
Test the logical replication in Neon with the different consumers
|
||||
Test the logical replication in Neon with ClickHouse as a consumer
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import time
|
||||
|
||||
import clickhouse_connect
|
||||
@@ -39,22 +40,15 @@ def test_clickhouse(remote_pg: RemotePostgres):
|
||||
"""
|
||||
Test the logical replication having ClickHouse as a client
|
||||
"""
|
||||
clickhouse_host = "clickhouse" if ("CI" in os.environ) else "127.0.0.1"
|
||||
conn_options = remote_pg.conn_options()
|
||||
for _ in range(5):
|
||||
try:
|
||||
conn = psycopg2.connect(remote_pg.connstr())
|
||||
except psycopg2.OperationalError as perr:
|
||||
log.debug(perr)
|
||||
time.sleep(1)
|
||||
else:
|
||||
break
|
||||
raise TimeoutError
|
||||
conn = psycopg2.connect(remote_pg.connstr())
|
||||
cur = conn.cursor()
|
||||
cur.execute("DROP TABLE IF EXISTS table1")
|
||||
cur.execute("CREATE TABLE table1 (id integer primary key, column1 varchar(10));")
|
||||
cur.execute("INSERT INTO table1 (id, column1) VALUES (1, 'abc'), (2, 'def');")
|
||||
conn.commit()
|
||||
client = clickhouse_connect.get_client(host="clickhouse")
|
||||
client = clickhouse_connect.get_client(host=clickhouse_host)
|
||||
client.command("SET allow_experimental_database_materialized_postgresql=1")
|
||||
client.command(
|
||||
"CREATE DATABASE db1_postgres ENGINE = "
|
||||
189
test_runner/logical_repl/test_debezium.py
Normal file
189
test_runner/logical_repl/test_debezium.py
Normal file
@@ -0,0 +1,189 @@
|
||||
"""
|
||||
Test the logical replication in Neon with Debezium as a consumer
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import RemotePostgres
|
||||
from fixtures.utils import wait_until
|
||||
from kafka import KafkaConsumer
|
||||
|
||||
|
||||
class DebeziumAPI:
|
||||
"""
|
||||
The class for Debezium API calls
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.__host = "debezium" if ("CI" in os.environ) else "127.0.0.1"
|
||||
self.__base_url = f"http://{self.__host}:8083"
|
||||
self.__connectors_url = f"{self.__base_url}/connectors"
|
||||
|
||||
def __request(self, method, addurl="", **kwargs):
|
||||
return requests.request(
|
||||
method,
|
||||
self.__connectors_url + addurl,
|
||||
headers={"Accept": "application/json", "Content-type": "application/json"},
|
||||
timeout=60,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def create_pg_connector(self, remote_pg: RemotePostgres, dbz_conn_name: str):
|
||||
"""
|
||||
Create a Postgres connector in debezium
|
||||
"""
|
||||
conn_options = remote_pg.conn_options()
|
||||
payload = {
|
||||
"name": dbz_conn_name,
|
||||
"config": {
|
||||
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
|
||||
"tasks.max": "1",
|
||||
"database.hostname": conn_options["host"],
|
||||
"database.port": "5432",
|
||||
"database.user": conn_options["user"],
|
||||
"database.password": conn_options["password"],
|
||||
"database.dbname": conn_options["dbname"],
|
||||
"plugin.name": "pgoutput",
|
||||
"topic.prefix": "dbserver1",
|
||||
"schema.include.list": "inventory",
|
||||
},
|
||||
}
|
||||
return self.__request("POST", json=payload)
|
||||
|
||||
def list_connectors(self):
|
||||
"""
|
||||
Returns a list of all connectors existent in Debezium.
|
||||
"""
|
||||
resp = self.__request("GET")
|
||||
assert resp.ok
|
||||
return json.loads(resp.text)
|
||||
|
||||
def del_connector(self, connector):
|
||||
"""
|
||||
Deletes the specified connector
|
||||
"""
|
||||
return self.__request("DELETE", f"/{connector}")
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def debezium(remote_pg: RemotePostgres):
|
||||
"""
|
||||
Prepare the Debezium API handler, connection
|
||||
"""
|
||||
conn = psycopg2.connect(remote_pg.connstr())
|
||||
cur = conn.cursor()
|
||||
cur.execute("DROP SCHEMA IF EXISTS inventory CASCADE")
|
||||
cur.execute("CREATE SCHEMA inventory")
|
||||
cur.execute(
|
||||
"CREATE TABLE inventory.customers ("
|
||||
"id SERIAL NOT NULL PRIMARY KEY,"
|
||||
"first_name character varying(255) NOT NULL,"
|
||||
"last_name character varying(255) NOT NULL,"
|
||||
"email character varying(255) NOT NULL)"
|
||||
)
|
||||
conn.commit()
|
||||
dbz = DebeziumAPI()
|
||||
assert len(dbz.list_connectors()) == 0
|
||||
dbz_conn_name = "inventory-connector"
|
||||
resp = dbz.create_pg_connector(remote_pg, dbz_conn_name)
|
||||
log.debug("%s %s %s", resp.status_code, resp.ok, resp.text)
|
||||
assert resp.status_code == 201
|
||||
assert len(dbz.list_connectors()) == 1
|
||||
consumer = KafkaConsumer(
|
||||
"dbserver1.inventory.customers",
|
||||
bootstrap_servers=["kafka:9092"],
|
||||
auto_offset_reset="earliest",
|
||||
enable_auto_commit=False,
|
||||
)
|
||||
yield conn, consumer
|
||||
resp = dbz.del_connector(dbz_conn_name)
|
||||
assert resp.status_code == 204
|
||||
|
||||
|
||||
def get_kafka_msg(consumer, ts_ms, before=None, after=None) -> None:
|
||||
"""
|
||||
Gets the message from Kafka and checks its validity
|
||||
Arguments:
|
||||
consumer: the consumer object
|
||||
ts_ms: timestamp in milliseconds of the change of db, the corresponding message must have
|
||||
the later timestamp
|
||||
before: a dictionary, if not None, the before field from the kafka message must
|
||||
have the same values for the same keys
|
||||
after: a dictionary, if not None, the after field from the kafka message must
|
||||
have the same values for the same keys
|
||||
"""
|
||||
msg = consumer.poll()
|
||||
assert msg, "Empty message"
|
||||
for val in msg.values():
|
||||
r = json.loads(val[-1].value)
|
||||
log.info(r["payload"])
|
||||
assert ts_ms < r["payload"]["ts_ms"], "Incorrect timestamp"
|
||||
for param, pname in ((before, "before"), (after, "after")):
|
||||
if param is not None:
|
||||
for k, v in param.items():
|
||||
assert r["payload"][pname][k] == v, f"{pname} mismatches"
|
||||
|
||||
|
||||
@pytest.mark.remote_cluster
|
||||
def test_debezium(debezium):
|
||||
"""
|
||||
Test the logical replication having Debezium as a subscriber
|
||||
"""
|
||||
conn, consumer = debezium
|
||||
cur = conn.cursor()
|
||||
ts_ms = time.time() * 1000
|
||||
log.info("Insert 1 ts_ms: %s", ts_ms)
|
||||
cur.execute(
|
||||
"insert into inventory.customers (first_name, last_name, email) "
|
||||
"values ('John', 'Dow','johndow@example.com')"
|
||||
)
|
||||
conn.commit()
|
||||
wait_until(
|
||||
100,
|
||||
0.5,
|
||||
lambda: get_kafka_msg(
|
||||
consumer,
|
||||
ts_ms,
|
||||
after={"first_name": "John", "last_name": "Dow", "email": "johndow@example.com"},
|
||||
),
|
||||
show_intermediate_error=True,
|
||||
)
|
||||
ts_ms = time.time() * 1000
|
||||
log.info("Insert 2 ts_ms: %s", ts_ms)
|
||||
cur.execute(
|
||||
"insert into inventory.customers (first_name, last_name, email) "
|
||||
"values ('Alex', 'Row','alexrow@example.com')"
|
||||
)
|
||||
conn.commit()
|
||||
wait_until(
|
||||
100,
|
||||
0.5,
|
||||
lambda: get_kafka_msg(
|
||||
consumer,
|
||||
ts_ms,
|
||||
after={"first_name": "Alex", "last_name": "Row", "email": "alexrow@example.com"},
|
||||
),
|
||||
show_intermediate_error=True,
|
||||
)
|
||||
ts_ms = time.time() * 1000
|
||||
log.info("Update ts_ms: %s", ts_ms)
|
||||
cur.execute("update inventory.customers set first_name = 'Alexander' where id = 2")
|
||||
conn.commit()
|
||||
wait_until(
|
||||
100,
|
||||
0.5,
|
||||
lambda: get_kafka_msg(
|
||||
consumer,
|
||||
ts_ms,
|
||||
after={"first_name": "Alexander"},
|
||||
),
|
||||
show_intermediate_error=True,
|
||||
)
|
||||
time.sleep(3)
|
||||
cur.execute("select 1")
|
||||
@@ -100,24 +100,32 @@ def test_subscriber_lag(
|
||||
pub_connstr = benchmark_project_pub.connstr
|
||||
sub_connstr = benchmark_project_sub.connstr
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=pub_env)
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=sub_env)
|
||||
if benchmark_project_pub.is_new:
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=pub_env)
|
||||
if benchmark_project_sub.is_new:
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=sub_env)
|
||||
|
||||
pub_conn = psycopg2.connect(pub_connstr)
|
||||
sub_conn = psycopg2.connect(sub_connstr)
|
||||
pub_conn.autocommit = True
|
||||
sub_conn.autocommit = True
|
||||
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
|
||||
if benchmark_project_pub.is_new:
|
||||
pub_cur.execute("create publication pub1 for table pgbench_accounts, pgbench_history")
|
||||
pub_cur.execute("SELECT 1 FROM pg_catalog.pg_publication WHERE pubname = 'pub1'")
|
||||
pub_exists = len(pub_cur.fetchall()) != 0
|
||||
|
||||
if benchmark_project_sub.is_new:
|
||||
if not pub_exists:
|
||||
pub_cur.execute("CREATE PUBLICATION pub1 FOR TABLE pgbench_accounts, pgbench_history")
|
||||
|
||||
sub_cur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub1'")
|
||||
sub_exists = len(sub_cur.fetchall()) != 0
|
||||
if not sub_exists:
|
||||
sub_cur.execute("truncate table pgbench_accounts")
|
||||
sub_cur.execute("truncate table pgbench_history")
|
||||
|
||||
sub_cur.execute(f"create subscription sub1 connection '{pub_connstr}' publication pub1")
|
||||
sub_cur.execute(f"CREATE SUBSCRIPTION sub1 CONNECTION '{pub_connstr}' PUBLICATION pub1")
|
||||
|
||||
initial_sync_lag = measure_logical_replication_lag(sub_cur, pub_cur)
|
||||
|
||||
pub_conn.close()
|
||||
sub_conn.close()
|
||||
|
||||
@@ -195,10 +203,15 @@ def test_publisher_restart(
|
||||
pub_conn.autocommit = True
|
||||
sub_conn.autocommit = True
|
||||
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
|
||||
if benchmark_project_pub.is_new:
|
||||
pub_cur.execute("SELECT 1 FROM pg_catalog.pg_publication WHERE pubname = 'pub1'")
|
||||
pub_exists = len(pub_cur.fetchall()) != 0
|
||||
|
||||
if not pub_exists:
|
||||
pub_cur.execute("create publication pub1 for table pgbench_accounts, pgbench_history")
|
||||
|
||||
if benchmark_project_sub.is_new:
|
||||
sub_cur.execute("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = 'sub1'")
|
||||
sub_exists = len(sub_cur.fetchall()) != 0
|
||||
if not sub_exists:
|
||||
sub_cur.execute("truncate table pgbench_accounts")
|
||||
sub_cur.execute("truncate table pgbench_history")
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ from fixtures.pageserver.utils import wait_until_tenant_active
|
||||
from fixtures.utils import query_scalar
|
||||
from performance.test_perf_pgbench import get_scales_matrix
|
||||
from requests import RequestException
|
||||
from requests.exceptions import RetryError
|
||||
|
||||
|
||||
# Test branch creation
|
||||
@@ -151,7 +150,7 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*",
|
||||
".*page_service_conn_main.*: query handler for 'basebackup .* is not active, state: Loading",
|
||||
".*page_service_conn_main.*: query handler for 'basebackup .* ERROR: Not found: Timeline",
|
||||
]
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
@@ -176,10 +175,12 @@ def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonE
|
||||
|
||||
env.neon_cli.map_branch(initial_branch, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
with pytest.raises(RuntimeError, match="is not active, state: Loading"):
|
||||
env.endpoints.create_start(initial_branch, tenant_id=env.initial_tenant)
|
||||
with pytest.raises(RuntimeError, match="ERROR: Not found: Timeline"):
|
||||
env.endpoints.create_start(
|
||||
initial_branch, tenant_id=env.initial_tenant, basebackup_request_tries=2
|
||||
)
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "off"))
|
||||
finally:
|
||||
# FIXME: paused uploads bother shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
t.join()
|
||||
@@ -193,8 +194,11 @@ def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*",
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: .*Cannot branch off the timeline that's not present in pageserver.*",
|
||||
]
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
@@ -216,7 +220,10 @@ def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder
|
||||
|
||||
branch_id = TimelineId.generate()
|
||||
|
||||
with pytest.raises(RetryError, match="too many 503 error responses"):
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match="Cannot branch off the timeline that's not present in pageserver",
|
||||
):
|
||||
ps_http.timeline_create(
|
||||
env.pg_version,
|
||||
env.initial_tenant,
|
||||
|
||||
@@ -3,18 +3,15 @@ import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
import pytest
|
||||
import toml
|
||||
from fixtures.common_types import Lsn
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
)
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import (
|
||||
timeline_delete_wait_completed,
|
||||
@@ -22,7 +19,8 @@ from fixtures.pageserver.utils import (
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.remote_storage import RemoteStorageKind, S3Storage, s3_storage
|
||||
from fixtures.workload import Workload
|
||||
|
||||
#
|
||||
# A test suite that help to prevent unintentionally breaking backward or forward compatibility between Neon releases.
|
||||
@@ -409,3 +407,133 @@ def dump_differs(
|
||||
break
|
||||
|
||||
return differs
|
||||
|
||||
|
||||
@dataclass
|
||||
class HistoricDataSet:
|
||||
name: str
|
||||
tenant_id: TenantId
|
||||
pg_version: PgVersion
|
||||
url: str
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
|
||||
HISTORIC_DATA_SETS = [
|
||||
# From before we enabled image layer compression.
|
||||
# - IndexPart::LATEST_VERSION 7
|
||||
# - STORAGE_FORMAT_VERSION 3
|
||||
HistoricDataSet(
|
||||
"2024-07-18",
|
||||
TenantId("17bf64a53509714687664b3a84e9b3ba"),
|
||||
PgVersion.V16,
|
||||
"https://neon-github-public-dev.s3.eu-central-1.amazonaws.com/compatibility-data-snapshots/2024-07-18-pgv16.tar.zst",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("dataset", HISTORIC_DATA_SETS)
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
def test_historic_storage_formats(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
dataset: HistoricDataSet,
|
||||
):
|
||||
"""
|
||||
This test is like test_backward_compatibility, but it looks back further to examples of our storage format from long ago.
|
||||
"""
|
||||
|
||||
ARTIFACT_CACHE_DIR = "./artifact_cache"
|
||||
|
||||
import tarfile
|
||||
from contextlib import closing
|
||||
|
||||
import requests
|
||||
import zstandard
|
||||
|
||||
artifact_unpack_path = ARTIFACT_CACHE_DIR / Path("unpacked") / Path(dataset.name)
|
||||
|
||||
# Note: we assume that when running across a matrix of PG versions, the matrix includes all the versions needed by
|
||||
# HISTORIC_DATA_SETS. If we ever remove a PG version from the matrix, then historic datasets built using that version
|
||||
# will no longer be covered by this test.
|
||||
if pg_version != dataset.pg_version:
|
||||
pytest.skip(f"Dataset {dataset} is for different PG version, skipping")
|
||||
|
||||
with closing(requests.get(dataset.url, stream=True)) as r:
|
||||
unzstd = zstandard.ZstdDecompressor()
|
||||
with unzstd.stream_reader(r.raw) as stream:
|
||||
with tarfile.open(mode="r|", fileobj=stream) as tf:
|
||||
tf.extractall(artifact_unpack_path)
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
neon_env_builder.pg_version = dataset.pg_version
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
assert isinstance(env.pageserver_remote_storage, S3Storage)
|
||||
|
||||
# Link artifact data into test's remote storage. We don't want the whole repo dir, just the remote storage part: we are not testing
|
||||
# compat of local disk data across releases (test_backward_compat does that), we're testing really long-lived data in S3 like layer files and indices.
|
||||
#
|
||||
# The code generating the snapshot uses local_fs, but this test uses S3Storage, so we are copying a tree of files into a bucket. We use
|
||||
# S3Storage so that the scrubber can run (the scrubber doesn't speak local_fs)
|
||||
artifact_pageserver_path = (
|
||||
artifact_unpack_path / Path("repo") / Path("local_fs_remote_storage") / Path("pageserver")
|
||||
)
|
||||
for root, _dirs, files in os.walk(artifact_pageserver_path):
|
||||
for file in files:
|
||||
local_path = os.path.join(root, file)
|
||||
remote_key = (
|
||||
env.pageserver_remote_storage.prefix_in_bucket
|
||||
+ str(local_path)[len(str(artifact_pageserver_path)) :]
|
||||
)
|
||||
log.info(f"Uploading {local_path} -> {remote_key}")
|
||||
env.pageserver_remote_storage.client.upload_file(
|
||||
local_path, env.pageserver_remote_storage.bucket_name, remote_key
|
||||
)
|
||||
|
||||
# Check the scrubber handles this old data correctly (can read it and doesn't consider it corrupt)
|
||||
#
|
||||
# Do this _before_ importing to the pageserver, as that import may start writing immediately
|
||||
metadata_summary = env.storage_scrubber.scan_metadata()
|
||||
assert metadata_summary["tenant_count"] >= 1
|
||||
assert metadata_summary["timeline_count"] >= 1
|
||||
assert not metadata_summary["with_errors"]
|
||||
assert not metadata_summary["with_warnings"]
|
||||
|
||||
env.neon_cli.import_tenant(dataset.tenant_id)
|
||||
|
||||
# Discover timelines
|
||||
timelines = env.pageserver.http_client().timeline_list(dataset.tenant_id)
|
||||
# All our artifacts should contain at least one timeline
|
||||
assert len(timelines) > 0
|
||||
|
||||
# TODO: ensure that the snapshots we're importing contain a sensible variety of content, at the very
|
||||
# least they should include a mixture of deltas and image layers. Preferably they should also
|
||||
# contain some "exotic" stuff like aux files from logical replication.
|
||||
|
||||
# Check we can start an endpoint and read the SQL that the artifact is meant to contain
|
||||
reference_sql_dump = artifact_unpack_path / Path("dump.sql")
|
||||
ep = env.endpoints.create_start("main", tenant_id=dataset.tenant_id)
|
||||
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
|
||||
pg_bin.run_capture(
|
||||
["pg_dumpall", f"--dbname={ep.connstr()}", f"--file={test_output_dir / 'dump.sql'}"]
|
||||
)
|
||||
assert not dump_differs(
|
||||
reference_sql_dump,
|
||||
test_output_dir / "dump.sql",
|
||||
test_output_dir / "dump.filediff",
|
||||
)
|
||||
ep.stop()
|
||||
|
||||
# Check we can also do writes to the database
|
||||
existing_timeline_id = TimelineId(timelines[0]["timeline_id"])
|
||||
workload = Workload(env, dataset.tenant_id, existing_timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(100)
|
||||
|
||||
# Check that compaction works
|
||||
env.pageserver.http_client().timeline_compact(
|
||||
dataset.tenant_id, existing_timeline_id, force_image_layer_creation=True
|
||||
)
|
||||
|
||||
@@ -12,7 +12,6 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.common_types import parse_layer_file_name
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
timeline_delete_wait_completed,
|
||||
@@ -313,6 +312,7 @@ def test_remote_storage_upload_queue_retries(
|
||||
|
||||
def churn_while_failpoints_active(result):
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("c")
|
||||
# this call will wait for the failpoints to be turned off
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("d")
|
||||
@@ -332,8 +332,8 @@ def test_remote_storage_upload_queue_retries(
|
||||
# Exponential back-off in upload queue, so, gracious timeouts.
|
||||
|
||||
wait_until(30, 1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="upload"), 0))
|
||||
wait_until(30, 1, lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 2))
|
||||
wait_until(30, 1, lambda: assert_gt(get_queued_count(file_kind="layer", op_kind="delete"), 0))
|
||||
wait_until(30, 1, lambda: assert_ge(get_queued_count(file_kind="index", op_kind="upload"), 1))
|
||||
wait_until(30, 1, lambda: assert_eq(get_queued_count(file_kind="layer", op_kind="delete"), 0))
|
||||
|
||||
# unblock churn operations
|
||||
configure_storage_sync_failpoints("off")
|
||||
@@ -769,11 +769,11 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv
|
||||
create_thread.join()
|
||||
|
||||
|
||||
def test_compaction_waits_for_upload(
|
||||
def test_paused_upload_stalls_checkpoint(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
This test forces a race between upload and compaction.
|
||||
This test checks that checkpoints block on uploads to remote storage.
|
||||
"""
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
@@ -788,6 +788,10 @@ def test_compaction_waits_for_upload(
|
||||
}
|
||||
)
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*PUT.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing"
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
@@ -808,76 +812,9 @@ def test_compaction_waits_for_upload(
|
||||
endpoint.safe_psql("CREATE TABLE foo AS SELECT x FROM generate_series(1, 10000) g(x)")
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
deltas_at_first = len(client.layer_map_info(tenant_id, timeline_id).delta_layers())
|
||||
assert (
|
||||
deltas_at_first == 2
|
||||
), "are you fixing #5863? just add one more checkpoint after 'CREATE TABLE bar ...' statement."
|
||||
|
||||
endpoint.safe_psql("CREATE TABLE bar AS SELECT x FROM generate_series(1, 10000) g(x)")
|
||||
endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1")
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
layers_before_last_checkpoint = client.layer_map_info(tenant_id, timeline_id).historic_by_name()
|
||||
upload_stuck_layers = layers_before_last_checkpoint - layers_at_creation.historic_by_name()
|
||||
|
||||
assert len(upload_stuck_layers) > 0
|
||||
|
||||
for name in upload_stuck_layers:
|
||||
assert env.pageserver.layer_exists(
|
||||
tenant_id, timeline_id, parse_layer_file_name(name)
|
||||
), "while uploads are stuck the layers should be present on disk"
|
||||
|
||||
# now this will do the L0 => L1 compaction and want to remove
|
||||
# upload_stuck_layers and the original initdb L0
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
# as uploads are paused, the upload_stuck_layers should still be with us
|
||||
for name in upload_stuck_layers:
|
||||
assert env.pageserver.layer_exists(
|
||||
tenant_id, timeline_id, parse_layer_file_name(name)
|
||||
), "uploads are stuck still over compaction"
|
||||
|
||||
compacted_layers = client.layer_map_info(tenant_id, timeline_id).historic_by_name()
|
||||
overlap = compacted_layers.intersection(upload_stuck_layers)
|
||||
assert len(overlap) == 0, "none of the L0's should remain after L0 => L1 compaction"
|
||||
assert (
|
||||
len(compacted_layers) == 1
|
||||
), "there should be one L1 after L0 => L1 compaction (without #5863 being fixed)"
|
||||
|
||||
def layer_deletes_completed():
|
||||
m = client.get_metric_value("pageserver_layer_completed_deletes_total")
|
||||
if m is None:
|
||||
return 0
|
||||
return int(m)
|
||||
|
||||
# if initdb created an initial delta layer, it might already be gc'd
|
||||
# because it was uploaded before the failpoint was enabled. however, the
|
||||
# deletion is not guaranteed to be complete.
|
||||
assert layer_deletes_completed() <= 1
|
||||
|
||||
client.configure_failpoints(("before-upload-layer-pausable", "off"))
|
||||
|
||||
# Ensure that this actually terminates
|
||||
wait_upload_queue_empty(client, tenant_id, timeline_id)
|
||||
|
||||
def until_layer_deletes_completed():
|
||||
deletes = layer_deletes_completed()
|
||||
log.info(f"layer_deletes: {deletes}")
|
||||
# ensure that initdb delta layer AND the previously stuck are now deleted
|
||||
assert deletes >= len(upload_stuck_layers) + 1
|
||||
|
||||
wait_until(10, 1, until_layer_deletes_completed)
|
||||
|
||||
for name in upload_stuck_layers:
|
||||
assert not env.pageserver.layer_exists(
|
||||
tenant_id, timeline_id, parse_layer_file_name(name)
|
||||
), "l0 should now be removed because of L0 => L1 compaction and completed uploads"
|
||||
|
||||
# We should not have hit the error handling path in uploads where a uploaded file is gone
|
||||
assert not env.pageserver.log_contains(
|
||||
"File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."
|
||||
)
|
||||
with pytest.raises(ReadTimeout):
|
||||
client.timeline_checkpoint(tenant_id, timeline_id, timeout=5)
|
||||
client.configure_failpoints(("before-upload-layer-pausable", "off"))
|
||||
|
||||
|
||||
def wait_upload_queue_empty(
|
||||
|
||||
67
test_runner/regress/test_timeline_gc_blocking.py
Normal file
67
test_runner/regress/test_timeline_gc_blocking.py
Normal file
@@ -0,0 +1,67 @@
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_timeline_detail_404
|
||||
|
||||
|
||||
def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"}
|
||||
)
|
||||
ps = env.pageserver
|
||||
http = ps.http_client()
|
||||
|
||||
foo_branch = env.neon_cli.create_branch("foo", "main", env.initial_tenant)
|
||||
|
||||
gc_active_line = ".* gc_loop.*: [12] timelines need GC"
|
||||
gc_skipped_line = ".* gc_loop.*: Skipping GC: .*"
|
||||
init_gc_skipped = ".*: initialized with gc blocked.*"
|
||||
|
||||
tenant_before = http.tenant_status(env.initial_tenant)
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_active_line)
|
||||
|
||||
assert ps.log_contains(gc_skipped_line, offset) is None
|
||||
|
||||
http.timeline_block_gc(env.initial_tenant, foo_branch)
|
||||
|
||||
tenant_after = http.tenant_status(env.initial_tenant)
|
||||
assert tenant_before != tenant_after
|
||||
gc_blocking = tenant_after["gc_blocking"]
|
||||
assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }"
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_skipped_line, offset)
|
||||
|
||||
ps.restart()
|
||||
ps.quiesce_tenants()
|
||||
|
||||
_, offset = env.pageserver.assert_log_contains(init_gc_skipped, offset)
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_skipped_line, offset)
|
||||
|
||||
# deletion unblocks gc
|
||||
http.timeline_delete(env.initial_tenant, foo_branch)
|
||||
wait_timeline_detail_404(http, env.initial_tenant, foo_branch, 10, 1.0)
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_active_line, offset)
|
||||
|
||||
http.timeline_block_gc(env.initial_tenant, env.initial_timeline)
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_skipped_line, offset)
|
||||
|
||||
# removing the manual block also unblocks gc
|
||||
http.timeline_unblock_gc(env.initial_tenant, env.initial_timeline)
|
||||
|
||||
wait_for_another_gc_round()
|
||||
_, offset = ps.assert_log_contains(gc_active_line, offset)
|
||||
|
||||
|
||||
def wait_for_another_gc_round():
|
||||
time.sleep(2)
|
||||
@@ -936,6 +936,9 @@ def test_timeline_logical_size_task_priority(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
# just make sure this doesn't hit an assertion
|
||||
client.timeline_detail(tenant_id, timeline_id, force_await_initial_logical_size=True)
|
||||
|
||||
# load in some data
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
endpoint.safe_psql_many(
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: dbd0e6428b...7bbe834c8c
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 035b73a9c5...9eba7dd382
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: b39f316137...5377f5ed72
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"v16": ["16.3", "b39f316137fdd29e2da15d2af2fdd1cfd18163be"],
|
||||
"v15": ["15.7", "035b73a9c5998f9a0ef35cc8df1bae680bf770fc"],
|
||||
"v14": ["14.12", "dbd0e6428b9274d72a10ac29bd3e3162faf109d4"]
|
||||
"v16": ["16.3", "5377f5ed7290af45b7cb6b0d98d43cbf4a4e77f3"],
|
||||
"v15": ["15.7", "9eba7dd382606ffca43aca865f337ec21bcdac73"],
|
||||
"v14": ["14.12", "7bbe834c8c2dc37802eca8484311599bc47341f6"]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user