mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 12:00:42 +00:00
Merge branch 'main' of https://github.com/neondatabase/neon into skyzh/fix-barrier
This commit is contained in:
@@ -25,8 +25,14 @@ def scan_pageserver_log_for_errors(
|
||||
|
||||
# It's an ERROR or WARN. Is it in the allow-list?
|
||||
for a in allowed_errors:
|
||||
if re.match(a, line):
|
||||
break
|
||||
try:
|
||||
if re.match(a, line):
|
||||
break
|
||||
# We can switch `re.error` with `re.PatternError` after 3.13
|
||||
# https://docs.python.org/3/library/re.html#re.PatternError
|
||||
except re.error:
|
||||
print(f"Invalid regex: '{a}'", file=sys.stderr)
|
||||
raise
|
||||
else:
|
||||
errors.append((lineno, line))
|
||||
return errors
|
||||
|
||||
@@ -665,6 +665,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
force_l0_compaction=False,
|
||||
wait_until_uploaded=False,
|
||||
enhanced_gc_bottom_most_compaction=False,
|
||||
body: Optional[dict[str, Any]] = None,
|
||||
):
|
||||
self.is_testing_enabled_or_skip()
|
||||
query = {}
|
||||
@@ -683,6 +684,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact",
|
||||
params=query,
|
||||
json=body,
|
||||
)
|
||||
log.info(f"Got compact request response code: {res.status_code}")
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -77,14 +77,16 @@ class MockS3Server:
|
||||
class LocalFsStorage:
|
||||
root: Path
|
||||
|
||||
def tenant_path(self, tenant_id: TenantId) -> Path:
|
||||
def tenant_path(self, tenant_id: Union[TenantId, TenantShardId]) -> Path:
|
||||
return self.root / "tenants" / str(tenant_id)
|
||||
|
||||
def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
||||
def timeline_path(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
) -> Path:
|
||||
return self.tenant_path(tenant_id) / "timelines" / str(timeline_id)
|
||||
|
||||
def timeline_latest_generation(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
) -> Optional[int]:
|
||||
timeline_files = os.listdir(self.timeline_path(tenant_id, timeline_id))
|
||||
index_parts = [f for f in timeline_files if f.startswith("index_part")]
|
||||
@@ -102,7 +104,9 @@ class LocalFsStorage:
|
||||
raise RuntimeError(f"No index_part found for {tenant_id}/{timeline_id}")
|
||||
return generations[-1]
|
||||
|
||||
def index_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
||||
def index_path(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
) -> Path:
|
||||
latest_gen = self.timeline_latest_generation(tenant_id, timeline_id)
|
||||
if latest_gen is None:
|
||||
filename = TIMELINE_INDEX_PART_FILE_NAME
|
||||
@@ -126,7 +130,9 @@ class LocalFsStorage:
|
||||
filename = f"{local_name}-{generation:08x}"
|
||||
return self.timeline_path(tenant_id, timeline_id) / filename
|
||||
|
||||
def index_content(self, tenant_id: TenantId, timeline_id: TimelineId) -> Any:
|
||||
def index_content(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
) -> Any:
|
||||
with self.index_path(tenant_id, timeline_id).open("r") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
@@ -495,8 +495,14 @@ def scan_log_for_errors(input: Iterable[str], allowed_errors: list[str]) -> list
|
||||
|
||||
# It's an ERROR or WARN. Is it in the allow-list?
|
||||
for a in allowed_errors:
|
||||
if re.match(a, line):
|
||||
break
|
||||
try:
|
||||
if re.match(a, line):
|
||||
break
|
||||
# We can switch `re.error` with `re.PatternError` after 3.13
|
||||
# https://docs.python.org/3/library/re.html#re.PatternError
|
||||
except re.error:
|
||||
log.error(f"Invalid regex: '{a}'")
|
||||
raise
|
||||
else:
|
||||
errors.append((lineno, line))
|
||||
return errors
|
||||
|
||||
@@ -116,6 +116,45 @@ page_cache_size=10
|
||||
assert vectored_average < 8
|
||||
|
||||
|
||||
def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
row_count = 1000
|
||||
churn_rounds = 10
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init(env.pageserver.id)
|
||||
|
||||
log.info("Writing initial data ...")
|
||||
workload.write_rows(row_count, env.pageserver.id)
|
||||
|
||||
for i in range(1, churn_rounds + 1):
|
||||
if i % 10 == 0:
|
||||
log.info(f"Running churn round {i}/{churn_rounds} ...")
|
||||
|
||||
workload.churn_rows(row_count, env.pageserver.id)
|
||||
# Force L0 compaction to ensure the number of layers is within bounds, so that gc-compaction can run.
|
||||
ps_http.timeline_compact(tenant_id, timeline_id, force_l0_compaction=True)
|
||||
assert ps_http.perf_info(tenant_id, timeline_id)[0]["num_of_l0"] <= 1
|
||||
ps_http.timeline_compact(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
enhanced_gc_bottom_most_compaction=True,
|
||||
body={
|
||||
"start": "000000000000000000000000000000000000",
|
||||
"end": "030000000000000000000000000000000000",
|
||||
},
|
||||
)
|
||||
|
||||
log.info("Validating at workload end ...")
|
||||
workload.validate(env.pageserver.id)
|
||||
|
||||
|
||||
# Stripe sizes in number of pages.
|
||||
TINY_STRIPES = 16
|
||||
LARGE_STRIPES = 32768
|
||||
|
||||
27
test_runner/regress/test_ondemand_wal_download.py
Normal file
27
test_runner/regress/test_ondemand_wal_download.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_on_demand_wal_download(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
ep = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
config_lines=[
|
||||
"max_wal_size=32MB",
|
||||
"min_wal_size=32MB",
|
||||
"neon.logical_replication_max_snap_files=10000",
|
||||
],
|
||||
)
|
||||
|
||||
con = ep.connect()
|
||||
cur = con.cursor()
|
||||
cur.execute("CREATE TABLE t(pk bigint primary key, payload text)")
|
||||
cur.execute("ALTER TABLE t ALTER payload SET STORAGE external")
|
||||
cur.execute("select pg_create_logical_replication_slot('myslot', 'test_decoding', false, true)")
|
||||
cur.execute("insert into t values (generate_series(1,100000),repeat('?',10000))")
|
||||
|
||||
ep.stop("fast")
|
||||
ep.start()
|
||||
con = ep.connect()
|
||||
cur = con.cursor()
|
||||
cur.execute("select pg_replication_slot_advance('myslot', pg_current_wal_insert_lsn())")
|
||||
@@ -365,6 +365,19 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
|
||||
workload.validate(pageserver_a.id)
|
||||
workload.validate(pageserver_b.id)
|
||||
|
||||
# Force compaction on destination pageserver
|
||||
pageserver_b.http_client().timeline_compact(tenant_id, timeline_id, force_l0_compaction=True)
|
||||
|
||||
# Destination pageserver is in AttachedMulti, it should have generated deletions but
|
||||
# not enqueued them yet.
|
||||
# Check deletion metrics via prometheus - should be 0 since we're in AttachedMulti
|
||||
assert (
|
||||
pageserver_b.http_client().get_metric_value(
|
||||
"pageserver_deletion_queue_submitted_total",
|
||||
)
|
||||
== 0
|
||||
)
|
||||
|
||||
# Revert the origin to secondary
|
||||
log.info("Setting origin to Secondary")
|
||||
pageserver_a.tenant_location_configure(
|
||||
@@ -389,6 +402,17 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
|
||||
},
|
||||
)
|
||||
|
||||
# Transition to AttachedSingle should have drained deletions generated by doing a compaction
|
||||
# while in AttachedMulti.
|
||||
def blocked_deletions_drained():
|
||||
submitted = pageserver_b.http_client().get_metric_value(
|
||||
"pageserver_deletion_queue_submitted_total"
|
||||
)
|
||||
assert submitted is not None
|
||||
assert submitted > 0
|
||||
|
||||
wait_until(10, 0.1, blocked_deletions_drained)
|
||||
|
||||
workload.churn_rows(64, pageserver_b.id)
|
||||
workload.validate(pageserver_b.id)
|
||||
del workload
|
||||
|
||||
@@ -110,13 +110,15 @@ def post_checks(env: NeonEnv, test_output_dir: Path, db_name: str, endpoint: End
|
||||
|
||||
check_restored_datadir_content(test_output_dir, env, endpoint, ignored_files=ignored_files)
|
||||
|
||||
# Ensure that compaction works, on a timeline containing all the diversity that postgres regression tests create.
|
||||
# Ensure that compaction/GC works, on a timeline containing all the diversity that postgres regression tests create.
|
||||
# There should have been compactions mid-test as well, this final check is in addition those.
|
||||
for shard, pageserver in tenant_get_shards(env, env.initial_tenant):
|
||||
pageserver.http_client().timeline_checkpoint(
|
||||
shard, env.initial_timeline, force_repartition=True, force_image_layer_creation=True
|
||||
)
|
||||
|
||||
pageserver.http_client().timeline_gc(shard, env.initial_timeline, None)
|
||||
|
||||
|
||||
# Run the main PostgreSQL regression tests, in src/test/regress.
|
||||
#
|
||||
|
||||
@@ -19,7 +19,7 @@ from fixtures.neon_fixtures import (
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty
|
||||
from fixtures.remote_storage import s3_storage
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, s3_storage
|
||||
from fixtures.utils import skip_in_debug_build, wait_until
|
||||
from fixtures.workload import Workload
|
||||
from pytest_httpserver import HTTPServer
|
||||
@@ -515,11 +515,12 @@ def test_sharding_split_smoke(
|
||||
|
||||
"""
|
||||
|
||||
# We will start with 4 shards and split into 8, then migrate all those
|
||||
# 8 shards onto separate pageservers
|
||||
shard_count = 4
|
||||
split_shard_count = 8
|
||||
neon_env_builder.num_pageservers = split_shard_count * 2
|
||||
# Shard count we start with
|
||||
shard_count = 2
|
||||
# Shard count we split into
|
||||
split_shard_count = 4
|
||||
# We will have 2 shards per pageserver once done (including secondaries)
|
||||
neon_env_builder.num_pageservers = split_shard_count
|
||||
|
||||
# 1MiB stripes: enable getting some meaningful data distribution without
|
||||
# writing large quantities of data in this test. The stripe size is given
|
||||
@@ -591,7 +592,7 @@ def test_sharding_split_smoke(
|
||||
|
||||
workload.validate()
|
||||
|
||||
assert len(pre_split_pageserver_ids) == 4
|
||||
assert len(pre_split_pageserver_ids) == shard_count
|
||||
|
||||
def shards_on_disk(shard_ids):
|
||||
for pageserver in env.pageservers:
|
||||
@@ -654,9 +655,9 @@ def test_sharding_split_smoke(
|
||||
# - shard_count reconciles for the original setup of the tenant
|
||||
# - shard_count reconciles for detaching the original secondary locations during split
|
||||
# - split_shard_count reconciles during shard splitting, for setting up secondaries.
|
||||
# - shard_count of the child shards will need to fail over to their secondaries
|
||||
# - shard_count of the child shard secondary locations will get moved to emptier nodes
|
||||
expect_reconciles = shard_count * 2 + split_shard_count + shard_count * 2
|
||||
# - split_shard_count/2 of the child shards will need to fail over to their secondaries (since we have 8 shards and 4 pageservers, only 4 will move)
|
||||
expect_reconciles = shard_count * 2 + split_shard_count + split_shard_count / 2
|
||||
|
||||
reconcile_ok = env.storage_controller.get_metric_value(
|
||||
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
|
||||
)
|
||||
@@ -720,22 +721,10 @@ def test_sharding_split_smoke(
|
||||
# dominated by shard count.
|
||||
log.info(f"total: {total}")
|
||||
assert total == {
|
||||
1: 1,
|
||||
2: 1,
|
||||
3: 1,
|
||||
4: 1,
|
||||
5: 1,
|
||||
6: 1,
|
||||
7: 1,
|
||||
8: 1,
|
||||
9: 1,
|
||||
10: 1,
|
||||
11: 1,
|
||||
12: 1,
|
||||
13: 1,
|
||||
14: 1,
|
||||
15: 1,
|
||||
16: 1,
|
||||
1: 2,
|
||||
2: 2,
|
||||
3: 2,
|
||||
4: 2,
|
||||
}
|
||||
|
||||
# The controller is not required to lay out the attached locations in any particular way, but
|
||||
@@ -1685,3 +1674,111 @@ def test_top_tenants(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
assert len(top["shards"]) == n_tenants - 4
|
||||
assert set(i["id"] for i in top["shards"]) == set(str(i[0]) for i in tenants[4:])
|
||||
|
||||
|
||||
def test_sharding_gc(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Exercise GC in a sharded tenant: because only shard 0 holds SLRU content, it acts as
|
||||
the "leader" for GC, and other shards read its index to learn what LSN they should
|
||||
GC up to.
|
||||
"""
|
||||
|
||||
shard_count = 4
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
TENANT_CONF = {
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": 128 * 1024,
|
||||
"compaction_threshold": 1,
|
||||
"compaction_target_size": 128 * 1024,
|
||||
# A short PITR horizon, so that we won't have to sleep too long in the test to wait for it to
|
||||
# happen.
|
||||
"pitr_interval": "1s",
|
||||
# disable background compaction and GC. We invoke it manually when we want it to happen.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# Disable automatic creation of image layers, as we will create them explicitly when we want them
|
||||
"image_creation_threshold": 9999,
|
||||
"image_layer_creation_check_threshold": 0,
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_shard_count=shard_count, initial_tenant_conf=TENANT_CONF
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
# Create a branch and write some data
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
initial_lsn = Lsn(workload.endpoint().safe_psql("SELECT pg_current_wal_lsn()")[0][0])
|
||||
log.info(f"Started at LSN: {initial_lsn}")
|
||||
|
||||
workload.init()
|
||||
|
||||
# Write enough data to generate multiple layers
|
||||
for _i in range(10):
|
||||
last_lsn = workload.write_rows(32)
|
||||
|
||||
assert last_lsn > initial_lsn
|
||||
|
||||
log.info(f"Wrote up to last LSN: {last_lsn}")
|
||||
|
||||
# Do full image layer generation. When we subsequently wait for PITR, all historic deltas
|
||||
# should be GC-able
|
||||
for shard_number in range(shard_count):
|
||||
shard = TenantShardId(tenant_id, shard_number, shard_count)
|
||||
env.get_tenant_pageserver(shard).http_client().timeline_compact(
|
||||
shard, timeline_id, force_image_layer_creation=True
|
||||
)
|
||||
|
||||
workload.churn_rows(32)
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
# Invoke GC on a non-zero shard and verify its GC cutoff LSN does not advance
|
||||
shard_one = TenantShardId(tenant_id, 1, shard_count)
|
||||
env.get_tenant_pageserver(shard_one).http_client().timeline_gc(
|
||||
shard_one, timeline_id, gc_horizon=None
|
||||
)
|
||||
|
||||
# Check shard 1's index - GC cutoff LSN should not have advanced
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
shard_1_index = env.pageserver_remote_storage.index_content(
|
||||
tenant_id=shard_one, timeline_id=timeline_id
|
||||
)
|
||||
shard_1_gc_cutoff_lsn = Lsn(shard_1_index["metadata_bytes"]["latest_gc_cutoff_lsn"])
|
||||
log.info(f"Shard 1 cutoff LSN: {shard_1_gc_cutoff_lsn}")
|
||||
assert shard_1_gc_cutoff_lsn <= last_lsn
|
||||
|
||||
shard_zero = TenantShardId(tenant_id, 0, shard_count)
|
||||
env.get_tenant_pageserver(shard_zero).http_client().timeline_gc(
|
||||
shard_zero, timeline_id, gc_horizon=None
|
||||
)
|
||||
|
||||
# TODO: observe that GC LSN of shard 0 has moved forward in remote storage
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
shard_0_index = env.pageserver_remote_storage.index_content(
|
||||
tenant_id=shard_zero, timeline_id=timeline_id
|
||||
)
|
||||
shard_0_gc_cutoff_lsn = Lsn(shard_0_index["metadata_bytes"]["latest_gc_cutoff_lsn"])
|
||||
log.info(f"Shard 0 cutoff LSN: {shard_0_gc_cutoff_lsn}")
|
||||
assert shard_0_gc_cutoff_lsn >= last_lsn
|
||||
|
||||
# Invoke GC on all other shards and verify their GC cutoff LSNs
|
||||
for shard_number in range(1, shard_count):
|
||||
shard = TenantShardId(tenant_id, shard_number, shard_count)
|
||||
env.get_tenant_pageserver(shard).http_client().timeline_gc(
|
||||
shard, timeline_id, gc_horizon=None
|
||||
)
|
||||
|
||||
# Verify GC cutoff LSN advanced to match shard 0
|
||||
shard_index = env.pageserver_remote_storage.index_content(
|
||||
tenant_id=shard, timeline_id=timeline_id
|
||||
)
|
||||
shard_gc_cutoff_lsn = Lsn(shard_index["metadata_bytes"]["latest_gc_cutoff_lsn"])
|
||||
log.info(f"Shard {shard_number} cutoff LSN: {shard_gc_cutoff_lsn}")
|
||||
assert shard_gc_cutoff_lsn == shard_0_gc_cutoff_lsn
|
||||
|
||||
@@ -369,12 +369,16 @@ def test_create_churn_during_restart(neon_env_builder: NeonEnvBuilder):
|
||||
- Bad response codes during shutdown (e.g. returning 500 instead of 503)
|
||||
- Issues where a tenant is still starting up while we receive a request for it
|
||||
- Issues with interrupting/resuming tenant/timeline creation in shutdown
|
||||
- Issues with a timeline is not created successfully because of restart.
|
||||
"""
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
tenant_id: TenantId = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
# At this point, the initial tenant/timeline might not have been created successfully,
|
||||
# and this is the case we want to test.
|
||||
|
||||
# Multiple creation requests which race will generate this error on the pageserver
|
||||
# and storage controller respectively
|
||||
env.pageserver.allowed_errors.append(".*Conflict: Tenant is already being modified.*")
|
||||
|
||||
@@ -1784,6 +1784,89 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
cur.execute("INSERT INTO t (key) VALUES (123)")
|
||||
|
||||
|
||||
def test_delete_timeline_under_load(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test deleting timelines on a safekeeper while they're under load.
|
||||
|
||||
This should not happen under normal operation, but it can happen if
|
||||
there is some rogue compute/pageserver that is writing/reading to a
|
||||
safekeeper that we're migrating a timeline away from, or if the timeline
|
||||
is being deleted while such a rogue client is running.
|
||||
"""
|
||||
neon_env_builder.auth_enabled = True
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Create two endpoints that will generate load
|
||||
timeline_id_a = env.create_branch("deleteme_a")
|
||||
timeline_id_b = env.create_branch("deleteme_b")
|
||||
|
||||
endpoint_a = env.endpoints.create("deleteme_a")
|
||||
endpoint_a.start()
|
||||
endpoint_b = env.endpoints.create("deleteme_b")
|
||||
endpoint_b.start()
|
||||
|
||||
# Get tenant and timeline IDs
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
# Start generating load on both timelines
|
||||
def generate_load(endpoint: Endpoint):
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
while True:
|
||||
try:
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1,1000), 'data'")
|
||||
except: # noqa
|
||||
# Ignore errors since timeline may be deleted
|
||||
break
|
||||
|
||||
t_a = threading.Thread(target=generate_load, args=(endpoint_a,))
|
||||
t_b = threading.Thread(target=generate_load, args=(endpoint_b,))
|
||||
try:
|
||||
t_a.start()
|
||||
t_b.start()
|
||||
|
||||
# Let the load run for a bit
|
||||
log.info("Warming up...")
|
||||
time.sleep(2)
|
||||
|
||||
# Safekeeper errors will propagate to the pageserver: it is correct that these are
|
||||
# logged at error severity because they indicate the pageserver is trying to read
|
||||
# a timeline that it shouldn't.
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*Timeline.*was cancelled.*",
|
||||
".*Timeline.*was not found.*",
|
||||
]
|
||||
)
|
||||
|
||||
# Try deleting timelines while under load
|
||||
sk = env.safekeepers[0]
|
||||
sk_http = sk.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id))
|
||||
|
||||
# Delete first timeline
|
||||
log.info(f"Deleting {timeline_id_a}...")
|
||||
assert sk_http.timeline_delete(tenant_id, timeline_id_a, only_local=True)["dir_existed"]
|
||||
|
||||
# Delete second timeline
|
||||
log.info(f"Deleting {timeline_id_b}...")
|
||||
assert sk_http.timeline_delete(tenant_id, timeline_id_b, only_local=True)["dir_existed"]
|
||||
|
||||
# Verify timelines are gone from disk
|
||||
sk_data_dir = sk.data_dir
|
||||
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_a)).exists()
|
||||
# assert not (sk_data_dir / str(tenant_id) / str(timeline_id_b)).exists()
|
||||
|
||||
finally:
|
||||
log.info("Stopping endpoints...")
|
||||
# Stop endpoints with immediate mode because we deleted the timeline out from under the compute, which may cause it to hang
|
||||
endpoint_a.stop(mode="immediate")
|
||||
endpoint_b.stop(mode="immediate")
|
||||
log.info("Joining threads...")
|
||||
t_a.join()
|
||||
t_b.join()
|
||||
|
||||
|
||||
# Basic pull_timeline test.
|
||||
# When live_sk_change is False, compute is restarted to change set of
|
||||
# safekeepers; otherwise it is live reload.
|
||||
|
||||
Reference in New Issue
Block a user