mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-27 08:09:58 +00:00
pageserver: add and stabilize import chaos test (#11982)
## Problem Test coverage of timeline imports is lacking. ## Summary of changes This PR adds a chaos import test. It runs an import while injecting various chaos events in the environment. All the commits that follow the test fix various issues that were surfaced by it. Closes https://github.com/neondatabase/neon/issues/10191
This commit is contained in:
@@ -370,6 +370,18 @@ impl From<crate::tenant::secondary::SecondaryTenantError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::tenant::FinalizeTimelineImportError> for ApiError {
|
||||
fn from(err: crate::tenant::FinalizeTimelineImportError) -> ApiError {
|
||||
use crate::tenant::FinalizeTimelineImportError::*;
|
||||
match err {
|
||||
ImportTaskStillRunning => {
|
||||
ApiError::ResourceUnavailable("Import task still running".into())
|
||||
}
|
||||
ShuttingDown => ApiError::ShuttingDown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to construct a TimelineInfo struct for a timeline
|
||||
async fn build_timeline_info(
|
||||
timeline: &Arc<Timeline>,
|
||||
@@ -3533,10 +3545,7 @@ async fn activate_post_import_handler(
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
tenant
|
||||
.finalize_importing_timeline(timeline_id)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
tenant.finalize_importing_timeline(timeline_id).await?;
|
||||
|
||||
match tenant.get_timeline(timeline_id, false) {
|
||||
Ok(_timeline) => {
|
||||
|
||||
@@ -864,6 +864,14 @@ impl Debug for SetStoppingError {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum FinalizeTimelineImportError {
|
||||
#[error("Import task not done yet")]
|
||||
ImportTaskStillRunning,
|
||||
#[error("Shutting down")]
|
||||
ShuttingDown,
|
||||
}
|
||||
|
||||
/// Arguments to [`TenantShard::create_timeline`].
|
||||
///
|
||||
/// Not usable as an idempotency key for timeline creation because if [`CreateTimelineParamsBranch::ancestor_start_lsn`]
|
||||
@@ -1150,10 +1158,20 @@ impl TenantShard {
|
||||
ctx,
|
||||
)?;
|
||||
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
|
||||
anyhow::ensure!(
|
||||
disk_consistent_lsn.is_valid(),
|
||||
"Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
|
||||
);
|
||||
|
||||
if !disk_consistent_lsn.is_valid() {
|
||||
// As opposed to normal timelines which get initialised with a disk consitent LSN
|
||||
// via initdb, imported timelines start from 0. If the import task stops before
|
||||
// it advances disk consitent LSN, allow it to resume.
|
||||
let in_progress_import = import_pgdata
|
||||
.as_ref()
|
||||
.map(|import| !import.is_done())
|
||||
.unwrap_or(false);
|
||||
if !in_progress_import {
|
||||
anyhow::bail!("Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn");
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
disk_consistent_lsn,
|
||||
metadata.disk_consistent_lsn(),
|
||||
@@ -1247,20 +1265,25 @@ impl TenantShard {
|
||||
}
|
||||
}
|
||||
|
||||
// Sanity check: a timeline should have some content.
|
||||
anyhow::ensure!(
|
||||
ancestor.is_some()
|
||||
|| timeline
|
||||
.layers
|
||||
.read()
|
||||
.await
|
||||
.layer_map()
|
||||
.expect("currently loading, layer manager cannot be shutdown already")
|
||||
.iter_historic_layers()
|
||||
.next()
|
||||
.is_some(),
|
||||
"Timeline has no ancestor and no layer files"
|
||||
);
|
||||
if disk_consistent_lsn.is_valid() {
|
||||
// Sanity check: a timeline should have some content.
|
||||
// Exception: importing timelines might not yet have any
|
||||
anyhow::ensure!(
|
||||
ancestor.is_some()
|
||||
|| timeline
|
||||
.layers
|
||||
.read()
|
||||
.await
|
||||
.layer_map()
|
||||
.expect(
|
||||
"currently loading, layer manager cannot be shutdown already"
|
||||
)
|
||||
.iter_historic_layers()
|
||||
.next()
|
||||
.is_some(),
|
||||
"Timeline has no ancestor and no layer files"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(TimelineInitAndSyncResult::ReadyToActivate)
|
||||
}
|
||||
@@ -2860,13 +2883,13 @@ impl TenantShard {
|
||||
pub(crate) async fn finalize_importing_timeline(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), FinalizeTimelineImportError> {
|
||||
let timeline = {
|
||||
let locked = self.timelines_importing.lock().unwrap();
|
||||
match locked.get(&timeline_id) {
|
||||
Some(importing_timeline) => {
|
||||
if !importing_timeline.import_task_handle.is_finished() {
|
||||
return Err(anyhow::anyhow!("Import task not done yet"));
|
||||
return Err(FinalizeTimelineImportError::ImportTaskStillRunning);
|
||||
}
|
||||
|
||||
importing_timeline.timeline.clone()
|
||||
@@ -2879,8 +2902,13 @@ impl TenantShard {
|
||||
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_import_pgdata_finalize()?;
|
||||
timeline.remote_client.wait_completion().await?;
|
||||
.schedule_index_upload_for_import_pgdata_finalize()
|
||||
.map_err(|_err| FinalizeTimelineImportError::ShuttingDown)?;
|
||||
timeline
|
||||
.remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.map_err(|_err| FinalizeTimelineImportError::ShuttingDown)?;
|
||||
|
||||
self.timelines_importing
|
||||
.lock()
|
||||
@@ -3484,8 +3512,9 @@ impl TenantShard {
|
||||
let mut timelines_importing = self.timelines_importing.lock().unwrap();
|
||||
timelines_importing
|
||||
.drain()
|
||||
.for_each(|(_timeline_id, importing_timeline)| {
|
||||
importing_timeline.shutdown();
|
||||
.for_each(|(timeline_id, importing_timeline)| {
|
||||
let span = tracing::info_span!("importing_timeline_shutdown", %timeline_id);
|
||||
js.spawn(async move { importing_timeline.shutdown().instrument(span).await });
|
||||
});
|
||||
}
|
||||
// test_long_timeline_create_then_tenant_delete is leaning on this message
|
||||
|
||||
@@ -25,8 +25,11 @@ pub(crate) struct ImportingTimeline {
|
||||
}
|
||||
|
||||
impl ImportingTimeline {
|
||||
pub(crate) fn shutdown(self) {
|
||||
pub(crate) async fn shutdown(self) {
|
||||
self.import_task_handle.abort();
|
||||
let _ = self.import_task_handle.await;
|
||||
|
||||
self.timeline.remote_client.shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -93,6 +96,11 @@ pub async fn doit(
|
||||
);
|
||||
}
|
||||
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_file_changes()?;
|
||||
timeline.remote_client.wait_completion().await?;
|
||||
|
||||
// Communicate that shard is done.
|
||||
// Ensure at-least-once delivery of the upcall to storage controller
|
||||
// before we mark the task as done and never come here again.
|
||||
|
||||
@@ -113,14 +113,14 @@ async fn run_v1(
|
||||
let plan_hash = hasher.finish();
|
||||
|
||||
if let Some(progress) = &import_progress {
|
||||
if plan_hash != progress.import_plan_hash {
|
||||
anyhow::bail!("Import plan does not match storcon metadata");
|
||||
}
|
||||
|
||||
// Handle collisions on jobs of unequal length
|
||||
if progress.jobs != plan.jobs.len() {
|
||||
anyhow::bail!("Import plan job length does not match storcon metadata")
|
||||
}
|
||||
|
||||
if plan_hash != progress.import_plan_hash {
|
||||
anyhow::bail!("Import plan does not match storcon metadata");
|
||||
}
|
||||
}
|
||||
|
||||
pausable_failpoint!("import-timeline-pre-execute-pausable");
|
||||
@@ -218,6 +218,19 @@ impl Planner {
|
||||
checkpoint_buf,
|
||||
)));
|
||||
|
||||
// Sort the tasks by the key ranges they handle.
|
||||
// The plan being generated here needs to be stable across invocations
|
||||
// of this method.
|
||||
self.tasks.sort_by_key(|task| match task {
|
||||
AnyImportTask::SingleKey(key) => (key.key, key.key.next()),
|
||||
AnyImportTask::RelBlocks(rel_blocks) => {
|
||||
(rel_blocks.key_range.start, rel_blocks.key_range.end)
|
||||
}
|
||||
AnyImportTask::SlruBlocks(slru_blocks) => {
|
||||
(slru_blocks.key_range.start, slru_blocks.key_range.end)
|
||||
}
|
||||
});
|
||||
|
||||
// Assigns parts of key space to later parallel jobs
|
||||
let mut last_end_key = Key::MIN;
|
||||
let mut current_chunk = Vec::new();
|
||||
@@ -426,6 +439,8 @@ impl Plan {
|
||||
}));
|
||||
},
|
||||
maybe_complete_job_idx = work.next() => {
|
||||
pausable_failpoint!("import-task-complete-pausable");
|
||||
|
||||
match maybe_complete_job_idx {
|
||||
Some(Ok((job_idx, res))) => {
|
||||
assert!(last_completed_job_idx.checked_add(1).unwrap() == job_idx);
|
||||
@@ -440,6 +455,9 @@ impl Plan {
|
||||
import_plan_hash,
|
||||
};
|
||||
|
||||
timeline.remote_client.schedule_index_upload_for_file_changes()?;
|
||||
timeline.remote_client.wait_completion().await?;
|
||||
|
||||
storcon_client.put_timeline_import_status(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
@@ -640,7 +658,11 @@ impl Hash for ImportSingleKeyTask {
|
||||
let ImportSingleKeyTask { key, buf } = self;
|
||||
|
||||
key.hash(state);
|
||||
buf.hash(state);
|
||||
// The key value might not have a stable binary representation.
|
||||
// For instance, the db directory uses an unstable hash-map.
|
||||
// To work around this we are a bit lax here and only hash the
|
||||
// size of the buffer which must be consistent.
|
||||
buf.len().hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -915,7 +937,7 @@ impl ChunkProcessingJob {
|
||||
let guard = timeline.layers.read().await;
|
||||
let existing_layer = guard.try_get_from_key(&desc.key());
|
||||
if let Some(layer) = existing_layer {
|
||||
if layer.metadata().generation != timeline.generation {
|
||||
if layer.metadata().generation == timeline.generation {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Import attempted to rewrite layer file in the same generation: {}",
|
||||
layer.local_path()
|
||||
|
||||
@@ -3922,6 +3922,11 @@ impl Service {
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(
|
||||
tenant_id=%req.tenant_shard_id.tenant_id,
|
||||
shard_id=%req.tenant_shard_id.shard_slug(),
|
||||
timeline_id=%req.timeline_id,
|
||||
))]
|
||||
pub(crate) async fn handle_timeline_shard_import_progress(
|
||||
self: &Arc<Self>,
|
||||
req: TimelineImportStatusRequest,
|
||||
@@ -3971,6 +3976,11 @@ impl Service {
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(
|
||||
tenant_id=%req.tenant_shard_id.tenant_id,
|
||||
shard_id=%req.tenant_shard_id.shard_slug(),
|
||||
timeline_id=%req.timeline_id,
|
||||
))]
|
||||
pub(crate) async fn handle_timeline_shard_import_progress_upcall(
|
||||
self: &Arc<Self>,
|
||||
req: PutTimelineImportStatusRequest,
|
||||
|
||||
@@ -404,6 +404,29 @@ class PageserverTracingConfig:
|
||||
return ("tracing", value)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PageserverImportConfig:
|
||||
import_job_concurrency: int
|
||||
import_job_soft_size_limit: int
|
||||
import_job_checkpoint_threshold: int
|
||||
|
||||
@staticmethod
|
||||
def default() -> PageserverImportConfig:
|
||||
return PageserverImportConfig(
|
||||
import_job_concurrency=4,
|
||||
import_job_soft_size_limit=512 * 1024,
|
||||
import_job_checkpoint_threshold=4,
|
||||
)
|
||||
|
||||
def to_config_key_value(self) -> tuple[str, dict[str, Any]]:
|
||||
value = {
|
||||
"import_job_concurrency": self.import_job_concurrency,
|
||||
"import_job_soft_size_limit": self.import_job_soft_size_limit,
|
||||
"import_job_checkpoint_threshold": self.import_job_checkpoint_threshold,
|
||||
}
|
||||
return ("timeline_import_config", value)
|
||||
|
||||
|
||||
class NeonEnvBuilder:
|
||||
"""
|
||||
Builder object to create a Neon runtime environment
|
||||
@@ -454,6 +477,7 @@ class NeonEnvBuilder:
|
||||
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
|
||||
pageserver_get_vectored_concurrent_io: str | None = None,
|
||||
pageserver_tracing_config: PageserverTracingConfig | None = None,
|
||||
pageserver_import_config: PageserverImportConfig | None = None,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.rust_log_override = rust_log_override
|
||||
@@ -511,6 +535,7 @@ class NeonEnvBuilder:
|
||||
)
|
||||
|
||||
self.pageserver_tracing_config = pageserver_tracing_config
|
||||
self.pageserver_import_config = pageserver_import_config
|
||||
|
||||
self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = (
|
||||
pageserver_default_tenant_config_compaction_algorithm
|
||||
@@ -1179,6 +1204,10 @@ class NeonEnv:
|
||||
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
|
||||
self.pageserver_get_vectored_concurrent_io = config.pageserver_get_vectored_concurrent_io
|
||||
self.pageserver_tracing_config = config.pageserver_tracing_config
|
||||
if config.pageserver_import_config is None:
|
||||
self.pageserver_import_config = PageserverImportConfig.default()
|
||||
else:
|
||||
self.pageserver_import_config = config.pageserver_import_config
|
||||
|
||||
# Create the neon_local's `NeonLocalInitConf`
|
||||
cfg: dict[str, Any] = {
|
||||
@@ -1258,12 +1287,6 @@ class NeonEnv:
|
||||
"no_sync": True,
|
||||
# Look for gaps in WAL received from safekeepeers
|
||||
"validate_wal_contiguity": True,
|
||||
# TODO(vlad): make these configurable through the builder
|
||||
"timeline_import_config": {
|
||||
"import_job_concurrency": 4,
|
||||
"import_job_soft_size_limit": 512 * 1024,
|
||||
"import_job_checkpoint_threshold": 4,
|
||||
},
|
||||
}
|
||||
|
||||
# Batching (https://github.com/neondatabase/neon/issues/9377):
|
||||
@@ -1325,6 +1348,12 @@ class NeonEnv:
|
||||
|
||||
ps_cfg[key] = value
|
||||
|
||||
if self.pageserver_import_config is not None:
|
||||
key, value = self.pageserver_import_config.to_config_key_value()
|
||||
|
||||
if key not in ps_cfg:
|
||||
ps_cfg[key] = value
|
||||
|
||||
# Create a corresponding NeonPageserver object
|
||||
ps = NeonPageserver(
|
||||
self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"]
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
import base64
|
||||
import concurrent.futures
|
||||
import json
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from enum import Enum
|
||||
from enum import Enum, StrEnum
|
||||
from pathlib import Path
|
||||
from threading import Event
|
||||
|
||||
@@ -11,7 +14,14 @@ import pytest
|
||||
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.fast_import import FastImport
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PgProtocol, VanillaPostgres
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PageserverImportConfig,
|
||||
PgBin,
|
||||
PgProtocol,
|
||||
StorageControllerMigrationConfig,
|
||||
VanillaPostgres,
|
||||
)
|
||||
from fixtures.pageserver.http import (
|
||||
ImportPgdataIdemptencyKey,
|
||||
)
|
||||
@@ -494,6 +504,259 @@ def test_import_respects_tenant_shutdown(
|
||||
wait_until(cplane_notified)
|
||||
|
||||
|
||||
@skip_in_debug_build("Validation query takes too long in debug builds")
|
||||
def test_import_chaos(
|
||||
neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer
|
||||
):
|
||||
"""
|
||||
Perform a timeline import while injecting chaos in the environment.
|
||||
We expect that the import completes eventually, that it passes validation and
|
||||
the resulting timeline can be written to.
|
||||
"""
|
||||
TARGET_RELBOCK_SIZE = 512 * 1024 * 1024 # 512 MiB
|
||||
ALLOWED_IMPORT_RUNTIME = 90 # seconds
|
||||
SHARD_COUNT = 4
|
||||
|
||||
neon_env_builder.num_pageservers = SHARD_COUNT
|
||||
neon_env_builder.pageserver_import_config = PageserverImportConfig(
|
||||
import_job_concurrency=1,
|
||||
import_job_soft_size_limit=64 * 1024,
|
||||
import_job_checkpoint_threshold=4,
|
||||
)
|
||||
|
||||
# Set up mock control plane HTTP server to listen for import completions
|
||||
import_completion_signaled = Event()
|
||||
# There's some Python magic at play here. A list can be updated from the
|
||||
# handler thread, but an optional cannot. Hence, use a list with one element.
|
||||
import_error = []
|
||||
|
||||
def handler(request: Request) -> Response:
|
||||
assert request.json is not None
|
||||
|
||||
body = request.json
|
||||
if "error" in body:
|
||||
if body["error"]:
|
||||
import_error.append(body["error"])
|
||||
|
||||
log.info(f"control plane /import_complete request: {request.json}")
|
||||
import_completion_signaled.set()
|
||||
return Response(json.dumps({}), status=200)
|
||||
|
||||
cplane_mgmt_api_server = make_httpserver
|
||||
cplane_mgmt_api_server.expect_request(
|
||||
"/storage/api/v1/import_complete", method="PUT"
|
||||
).respond_with_handler(handler)
|
||||
|
||||
# Plug the cplane mock in
|
||||
neon_env_builder.control_plane_hooks_api = (
|
||||
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
|
||||
)
|
||||
|
||||
# The import will specifiy a local filesystem path mocking remote storage
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
||||
vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
|
||||
|
||||
nrows = 0
|
||||
while True:
|
||||
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
|
||||
log.info(
|
||||
f"relblock size: {relblock_size / 8192} pages (target: {TARGET_RELBOCK_SIZE // 8192}) pages"
|
||||
)
|
||||
if relblock_size >= TARGET_RELBOCK_SIZE:
|
||||
break
|
||||
addrows = int((TARGET_RELBOCK_SIZE - relblock_size) // 8192)
|
||||
assert addrows >= 1, "forward progress"
|
||||
vanilla_pg.safe_psql(
|
||||
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
|
||||
)
|
||||
nrows += addrows
|
||||
|
||||
vanilla_pg.stop()
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
# Pause after every import task to extend the test runtime and allow
|
||||
# for more chaos injection.
|
||||
for ps in env.pageservers:
|
||||
ps.add_persistent_failpoint("import-task-complete-pausable", "sleep(5)")
|
||||
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
# The shard might have moved or the pageserver hosting the shard restarted
|
||||
".*Call to node.*management API.*failed.*",
|
||||
# Migrations have their time outs set to 0
|
||||
".*Timed out after.*downloading layers.*",
|
||||
".*Failed to prepare by downloading layers.*",
|
||||
# The test may kill the storage controller or pageservers
|
||||
".*request was dropped before completing.*",
|
||||
]
|
||||
)
|
||||
for ps in env.pageservers:
|
||||
ps.allowed_errors.extend(
|
||||
[
|
||||
# We might re-write a layer in a different generation if the import
|
||||
# needs to redo some of the progress since not each job is checkpointed.
|
||||
".*was unlinked but was not dangling.*",
|
||||
# The test may kill the storage controller or pageservers
|
||||
".*request was dropped before completing.*",
|
||||
# Test can SIGTERM pageserver while it is downloading
|
||||
".*removing local file.*temp_download.*",
|
||||
".*Failed to flush heatmap.*",
|
||||
# Test can SIGTERM the storage controller while pageserver
|
||||
# is attempting to upcall.
|
||||
".*storage controller upcall failed.*timeline_import_status.*",
|
||||
# TODO(vlad): TenantManager::reset_tenant returns a blanked anyhow error.
|
||||
# It should return ResourceUnavailable or something that doesn't error log.
|
||||
".*activate_post_import.*InternalServerError.*tenant map is shutting down.*",
|
||||
# TODO(vlad): How can this happen?
|
||||
".*Failed to download a remote file: deserialize index part file.*",
|
||||
".*Cancelled request finished with an error.*",
|
||||
]
|
||||
)
|
||||
|
||||
importbucket_path = neon_env_builder.repo_dir / "test_import_chaos_bucket"
|
||||
mock_import_bucket(vanilla_pg, importbucket_path)
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
idempotency = ImportPgdataIdemptencyKey.random()
|
||||
|
||||
env.storage_controller.tenant_create(
|
||||
tenant_id, shard_count=SHARD_COUNT, placement_policy={"Attached": 1}
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
env.storage_controller.timeline_create(
|
||||
tenant_id,
|
||||
{
|
||||
"new_timeline_id": str(timeline_id),
|
||||
"import_pgdata": {
|
||||
"idempotency_key": str(idempotency),
|
||||
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
def chaos(stop_chaos: threading.Event):
|
||||
class ChaosType(StrEnum):
|
||||
MIGRATE_SHARD = "migrate_shard"
|
||||
RESTART_IMMEDIATE = "restart_immediate"
|
||||
RESTART = "restart"
|
||||
STORCON_RESTART_IMMEDIATE = "storcon_restart_immediate"
|
||||
|
||||
while not stop_chaos.is_set():
|
||||
chaos_type = random.choices(
|
||||
population=[
|
||||
ChaosType.MIGRATE_SHARD,
|
||||
ChaosType.RESTART,
|
||||
ChaosType.RESTART_IMMEDIATE,
|
||||
ChaosType.STORCON_RESTART_IMMEDIATE,
|
||||
],
|
||||
weights=[0.25, 0.25, 0.25, 0.25],
|
||||
k=1,
|
||||
)[0]
|
||||
|
||||
try:
|
||||
if chaos_type == ChaosType.MIGRATE_SHARD:
|
||||
target_shard_number = random.randint(0, SHARD_COUNT - 1)
|
||||
target_shard = TenantShardId(tenant_id, target_shard_number, SHARD_COUNT)
|
||||
|
||||
placements = env.storage_controller.get_tenants_placement()
|
||||
log.info(f"{placements=}")
|
||||
target_ps = placements[str(target_shard)]["intent"]["attached"]
|
||||
if len(placements[str(target_shard)]["intent"]["secondary"]) == 0:
|
||||
dest_ps = None
|
||||
else:
|
||||
dest_ps = placements[str(target_shard)]["intent"]["secondary"][0]
|
||||
|
||||
if target_ps is None or dest_ps is None:
|
||||
continue
|
||||
|
||||
config = StorageControllerMigrationConfig(
|
||||
secondary_warmup_timeout="0s",
|
||||
secondary_download_request_timeout="0s",
|
||||
prewarm=False,
|
||||
)
|
||||
env.storage_controller.tenant_shard_migrate(target_shard, dest_ps, config)
|
||||
|
||||
log.info(
|
||||
f"CHAOS: Migrating shard {target_shard} from pageserver {target_ps} to {dest_ps}"
|
||||
)
|
||||
elif chaos_type == ChaosType.RESTART_IMMEDIATE:
|
||||
target_ps = random.choice(env.pageservers)
|
||||
log.info(f"CHAOS: Immediate restart of pageserver {target_ps.id}")
|
||||
target_ps.stop(immediate=True)
|
||||
target_ps.start()
|
||||
elif chaos_type == ChaosType.RESTART:
|
||||
target_ps = random.choice(env.pageservers)
|
||||
log.info(f"CHAOS: Normal restart of pageserver {target_ps.id}")
|
||||
target_ps.stop(immediate=False)
|
||||
target_ps.start()
|
||||
elif chaos_type == ChaosType.STORCON_RESTART_IMMEDIATE:
|
||||
log.info("CHAOS: Immediate restart of storage controller")
|
||||
env.storage_controller.stop(immediate=True)
|
||||
env.storage_controller.start()
|
||||
except Exception as e:
|
||||
log.warning(f"CHAOS: Error during chaos operation {chaos_type}: {e}")
|
||||
|
||||
# Sleep before next chaos event
|
||||
time.sleep(1)
|
||||
|
||||
log.info("Chaos injector stopped")
|
||||
|
||||
def wait_for_import_completion():
|
||||
start = time.time()
|
||||
done = import_completion_signaled.wait(ALLOWED_IMPORT_RUNTIME)
|
||||
if not done:
|
||||
raise TimeoutError(f"Import did not signal completion within {ALLOWED_IMPORT_RUNTIME}")
|
||||
|
||||
end = time.time()
|
||||
|
||||
log.info(f"Import completion signalled after {end - start}s {import_error=}")
|
||||
|
||||
if import_error:
|
||||
raise RuntimeError(f"Import error: {import_error}")
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
stop_chaos = threading.Event()
|
||||
|
||||
wait_for_import_completion_fut = executor.submit(wait_for_import_completion)
|
||||
chaos_fut = executor.submit(chaos, stop_chaos)
|
||||
|
||||
try:
|
||||
wait_for_import_completion_fut.result()
|
||||
except Exception as e:
|
||||
raise e
|
||||
finally:
|
||||
stop_chaos.set()
|
||||
chaos_fut.result()
|
||||
|
||||
import_branch_name = "imported"
|
||||
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
|
||||
endpoint = env.endpoints.create_start(branch_name=import_branch_name, tenant_id=tenant_id)
|
||||
|
||||
# Validate the imported data is legit
|
||||
assert endpoint.safe_psql_many(
|
||||
[
|
||||
"set effective_io_concurrency=32;",
|
||||
"SET statement_timeout='300s';",
|
||||
"select count(*), sum(data::bigint)::bigint from t",
|
||||
]
|
||||
) == [[], [], [(nrows, nrows * (nrows + 1) // 2)]]
|
||||
|
||||
endpoint.stop()
|
||||
|
||||
# Validate writes
|
||||
workload = Workload(env, tenant_id, timeline_id, branch_name=import_branch_name)
|
||||
workload.init()
|
||||
workload.write_rows(64)
|
||||
workload.validate()
|
||||
|
||||
|
||||
def test_fast_import_with_pageserver_ingest(
|
||||
test_output_dir,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
|
||||
Reference in New Issue
Block a user