safekeeper: reorder routes and their handlers.

Routes and their handlers were in a bit different order in 1) routes
list 2) their implementation 3) python client 4) openapi spec, making
addition of new ones intimidating. Make it the same everywhere, roughly
lexicographically but preserving some of existing logic.

No functional changes.
This commit is contained in:
Arseny Sher
2024-08-26 16:12:51 +03:00
committed by Arseny Sher
parent 7820c572e7
commit 09362b6363
3 changed files with 216 additions and 218 deletions

View File

@@ -86,42 +86,6 @@ paths:
default:
$ref: "#/components/responses/GenericError"
/v1/tenant/{tenant_id}/timeline/{source_timeline_id}/copy:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: source_timeline_id
in: path
required: true
schema:
type: string
format: hex
post:
tags:
- "Timeline"
summary: Register new timeline as copy of existing timeline
description: ""
operationId: v1CopyTenantTimeline
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/TimelineCopyRequest"
responses:
"201":
description: Timeline created
# TODO: return timeline info?
"403":
$ref: "#/components/responses/ForbiddenError"
default:
$ref: "#/components/responses/GenericError"
/v1/tenant/{tenant_id}/timeline/{timeline_id}:
parameters:
- name: tenant_id
@@ -179,6 +143,40 @@ paths:
default:
$ref: "#/components/responses/GenericError"
/v1/tenant/{tenant_id}/timeline/{source_timeline_id}/copy:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: source_timeline_id
in: path
required: true
schema:
type: string
format: hex
post:
tags:
- "Timeline"
summary: Register new timeline as copy of existing timeline
description: ""
operationId: v1CopyTenantTimeline
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/TimelineCopyRequest"
responses:
"201":
description: Timeline created
# TODO: return timeline info?
"403":
$ref: "#/components/responses/ForbiddenError"
default:
$ref: "#/components/responses/GenericError"
/v1/record_safekeeper_info/{tenant_id}/{timeline_id}:
parameters:

View File

@@ -114,7 +114,55 @@ fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Res
})
}
/// Deactivates all timelines for the tenant and removes its data directory.
/// See `timeline_delete_handler`.
async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id = parse_request_param(&request, "tenant_id")?;
let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
check_permission(&request, Some(tenant_id))?;
ensure_no_body(&mut request).await?;
// FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
// Using an `InternalServerError` should be fixed when the types support it
let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id, only_local)
.await
.map_err(ApiError::InternalServerError)?;
json_response(
StatusCode::OK,
delete_info
.iter()
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
.collect::<HashMap<String, TimelineDeleteForceResult>>(),
)
}
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
let ttid = TenantTimelineId {
tenant_id: request_data.tenant_id,
timeline_id: request_data.timeline_id,
};
check_permission(&request, Some(ttid.tenant_id))?;
let server_info = ServerInfo {
pg_version: request_data.pg_version,
system_id: request_data.system_id.unwrap_or(0),
wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
};
let local_start_lsn = request_data.local_start_lsn.unwrap_or_else(|| {
request_data
.commit_lsn
.segment_lsn(server_info.wal_seg_size as usize)
});
GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
/// List all (not deleted) timelines.
/// Note: it is possible to do the same with debug_dump.
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let res: Vec<TenantTimelineId> = GlobalTimelines::get_all()
@@ -174,30 +222,21 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
json_response(StatusCode::OK, status)
}
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
let ttid = TenantTimelineId {
tenant_id: request_data.tenant_id,
timeline_id: request_data.timeline_id,
};
/// Deactivates the timeline and removes its data directory.
async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
check_permission(&request, Some(ttid.tenant_id))?;
let server_info = ServerInfo {
pg_version: request_data.pg_version,
system_id: request_data.system_id.unwrap_or(0),
wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
};
let local_start_lsn = request_data.local_start_lsn.unwrap_or_else(|| {
request_data
.commit_lsn
.segment_lsn(server_info.wal_seg_size as usize)
});
GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
ensure_no_body(&mut request).await?;
// FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
// error handling here when we're able to.
let resp = GlobalTimelines::delete(&ttid, only_local)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
json_response(StatusCode::OK, resp)
}
/// Pull timeline from peer safekeeper instances.
@@ -279,6 +318,46 @@ async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Bo
json_response(StatusCode::OK, ())
}
async fn patch_control_file_handler(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let patch_request: patch_control_file::Request = json_request(&mut request).await?;
let response = patch_control_file::handle_request(tli, patch_request)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, response)
}
/// Force persist control file.
async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
let tli = GlobalTimelines::get(ttid)?;
tli.write_shared_state()
.await
.sk
.state_mut()
.flush()
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
@@ -310,64 +389,6 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
json_response(StatusCode::OK, response)
}
/// Force persist control file.
async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
let tli = GlobalTimelines::get(ttid)?;
tli.write_shared_state()
.await
.sk
.state_mut()
.flush()
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
/// Deactivates the timeline and removes its data directory.
async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
check_permission(&request, Some(ttid.tenant_id))?;
ensure_no_body(&mut request).await?;
// FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
// error handling here when we're able to.
let resp = GlobalTimelines::delete(&ttid, only_local)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, resp)
}
/// Deactivates all timelines for the tenant and removes its data directory.
/// See `timeline_delete_handler`.
async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id = parse_request_param(&request, "tenant_id")?;
let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
check_permission(&request, Some(tenant_id))?;
ensure_no_body(&mut request).await?;
// FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
// Using an `InternalServerError` should be fixed when the types support it
let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id, only_local)
.await
.map_err(ApiError::InternalServerError)?;
json_response(
StatusCode::OK,
delete_info
.iter()
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
.collect::<HashMap<String, TimelineDeleteForceResult>>(),
)
}
/// Used only in tests to hand craft required data.
async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
@@ -509,26 +530,6 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
Ok(response)
}
async fn patch_control_file_handler(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let patch_request: patch_control_file::Request = json_request(&mut request).await?;
let response = patch_control_file::handle_request(tli, patch_request)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, response)
}
/// Safekeeper http router.
pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
let mut router = endpoint::make_router();
@@ -568,6 +569,9 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
failpoints_handler(r, cancel).await
})
})
.delete("/v1/tenant/:tenant_id", |r| {
request_span(r, tenant_delete_handler)
})
// Will be used in the future instead of implicit timeline creation
.post("/v1/tenant/timeline", |r| {
request_span(r, timeline_create_handler)
@@ -581,16 +585,13 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
request_span(r, timeline_delete_handler)
})
.delete("/v1/tenant/:tenant_id", |r| {
request_span(r, tenant_delete_handler)
.post("/v1/pull_timeline", |r| {
request_span(r, timeline_pull_handler)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
|r| request_span(r, timeline_snapshot_handler),
)
.post("/v1/pull_timeline", |r| {
request_span(r, timeline_pull_handler)
})
.post(
"/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
|r| request_span(r, timeline_copy_handler),
@@ -603,14 +604,13 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
"/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
|r| request_span(r, timeline_checkpoint_handler),
)
// for tests
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| {
request_span(r, timeline_digest_handler)
})
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
request_span(r, record_safekeeper_info)
})
.get("/v1/debug_dump", |r| request_span(r, dump_debug_handler))
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| {
request_span(r, timeline_digest_handler)
})
}
#[cfg(test)]

View File

@@ -65,6 +65,16 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def get_metrics_str(self) -> str:
"""You probably want to use get_metrics() instead."""
request_result = self.get(f"http://localhost:{self.port}/metrics")
request_result.raise_for_status()
return request_result.text
def get_metrics(self) -> SafekeeperMetrics:
res = self.get_metrics_str()
return SafekeeperMetrics(parse_metrics(res))
def is_testing_enabled_or_skip(self):
if not self.is_testing_enabled:
pytest.skip("safekeeper was built without 'testing' feature")
@@ -89,56 +99,8 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
assert res_json is None
return res_json
def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
params = params or {}
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)
res.raise_for_status()
res_json = json.loads(res.text)
assert isinstance(res_json, dict)
return res_json
def patch_control_file(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
patch: Dict[str, Any],
) -> Dict[str, Any]:
res = self.patch(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/control_file",
json={
"updates": patch,
"apply_fields": list(patch.keys()),
},
)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]:
res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: Dict[str, Any]):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy",
json=body,
)
res.raise_for_status()
def timeline_digest(
self, tenant_id: TenantId, timeline_id: TimelineId, from_lsn: Lsn, until_lsn: Lsn
) -> Dict[str, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/digest",
params={
"from_lsn": str(from_lsn),
"until_lsn": str(until_lsn),
},
)
def tenant_delete_force(self, tenant_id: TenantId) -> Dict[Any, Any]:
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
@@ -189,20 +151,6 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
def get_commit_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
return self.timeline_status(tenant_id, timeline_id).commit_lsn
def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body):
res = self.post(
f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}",
json=body,
)
res.raise_for_status()
def checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
json={},
)
res.raise_for_status()
# only_local doesn't remove segments in the remote storage.
def timeline_delete(
self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False
@@ -218,19 +166,71 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
assert isinstance(res_json, dict)
return res_json
def tenant_delete_force(self, tenant_id: TenantId) -> Dict[Any, Any]:
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
params = params or {}
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)
res.raise_for_status()
res_json = json.loads(res.text)
assert isinstance(res_json, dict)
return res_json
def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]:
res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def get_metrics_str(self) -> str:
"""You probably want to use get_metrics() instead."""
request_result = self.get(f"http://localhost:{self.port}/metrics")
request_result.raise_for_status()
return request_result.text
def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: Dict[str, Any]):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy",
json=body,
)
res.raise_for_status()
def get_metrics(self) -> SafekeeperMetrics:
res = self.get_metrics_str()
return SafekeeperMetrics(parse_metrics(res))
def patch_control_file(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
patch: Dict[str, Any],
) -> Dict[str, Any]:
res = self.patch(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/control_file",
json={
"updates": patch,
"apply_fields": list(patch.keys()),
},
)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
json={},
)
res.raise_for_status()
def timeline_digest(
self, tenant_id: TenantId, timeline_id: TimelineId, from_lsn: Lsn, until_lsn: Lsn
) -> Dict[str, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/digest",
params={
"from_lsn": str(from_lsn),
"until_lsn": str(until_lsn),
},
)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body):
res = self.post(
f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}",
json=body,
)
res.raise_for_status()