From 09362b63635d46cecef77cb04bd1e406dabd2026 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 26 Aug 2024 16:12:51 +0300 Subject: [PATCH] safekeeper: reorder routes and their handlers. Routes and their handlers were in a bit different order in 1) routes list 2) their implementation 3) python client 4) openapi spec, making addition of new ones intimidating. Make it the same everywhere, roughly lexicographically but preserving some of existing logic. No functional changes. --- safekeeper/src/http/openapi_spec.yaml | 70 ++++---- safekeeper/src/http/routes.rs | 216 ++++++++++++------------ test_runner/fixtures/safekeeper/http.py | 148 ++++++++-------- 3 files changed, 216 insertions(+), 218 deletions(-) diff --git a/safekeeper/src/http/openapi_spec.yaml b/safekeeper/src/http/openapi_spec.yaml index a617e0310c..70999853c2 100644 --- a/safekeeper/src/http/openapi_spec.yaml +++ b/safekeeper/src/http/openapi_spec.yaml @@ -86,42 +86,6 @@ paths: default: $ref: "#/components/responses/GenericError" - /v1/tenant/{tenant_id}/timeline/{source_timeline_id}/copy: - parameters: - - name: tenant_id - in: path - required: true - schema: - type: string - format: hex - - name: source_timeline_id - in: path - required: true - schema: - type: string - format: hex - - post: - tags: - - "Timeline" - summary: Register new timeline as copy of existing timeline - description: "" - operationId: v1CopyTenantTimeline - requestBody: - content: - application/json: - schema: - $ref: "#/components/schemas/TimelineCopyRequest" - responses: - "201": - description: Timeline created - # TODO: return timeline info? - "403": - $ref: "#/components/responses/ForbiddenError" - default: - $ref: "#/components/responses/GenericError" - - /v1/tenant/{tenant_id}/timeline/{timeline_id}: parameters: - name: tenant_id @@ -179,6 +143,40 @@ paths: default: $ref: "#/components/responses/GenericError" + /v1/tenant/{tenant_id}/timeline/{source_timeline_id}/copy: + parameters: + - name: tenant_id + in: path + required: true + schema: + type: string + format: hex + - name: source_timeline_id + in: path + required: true + schema: + type: string + format: hex + + post: + tags: + - "Timeline" + summary: Register new timeline as copy of existing timeline + description: "" + operationId: v1CopyTenantTimeline + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/TimelineCopyRequest" + responses: + "201": + description: Timeline created + # TODO: return timeline info? + "403": + $ref: "#/components/responses/ForbiddenError" + default: + $ref: "#/components/responses/GenericError" /v1/record_safekeeper_info/{tenant_id}/{timeline_id}: parameters: diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index d11815f6ef..91ffa95c21 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -114,7 +114,55 @@ fn check_permission(request: &Request, tenant_id: Option) -> Res }) } +/// Deactivates all timelines for the tenant and removes its data directory. +/// See `timeline_delete_handler`. +async fn tenant_delete_handler(mut request: Request) -> Result, ApiError> { + let tenant_id = parse_request_param(&request, "tenant_id")?; + let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false); + check_permission(&request, Some(tenant_id))?; + ensure_no_body(&mut request).await?; + // FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons; + // Using an `InternalServerError` should be fixed when the types support it + let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id, only_local) + .await + .map_err(ApiError::InternalServerError)?; + json_response( + StatusCode::OK, + delete_info + .iter() + .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp)) + .collect::>(), + ) +} + +async fn timeline_create_handler(mut request: Request) -> Result, ApiError> { + let request_data: TimelineCreateRequest = json_request(&mut request).await?; + + let ttid = TenantTimelineId { + tenant_id: request_data.tenant_id, + timeline_id: request_data.timeline_id, + }; + check_permission(&request, Some(ttid.tenant_id))?; + + let server_info = ServerInfo { + pg_version: request_data.pg_version, + system_id: request_data.system_id.unwrap_or(0), + wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32), + }; + let local_start_lsn = request_data.local_start_lsn.unwrap_or_else(|| { + request_data + .commit_lsn + .segment_lsn(server_info.wal_seg_size as usize) + }); + GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn) + .await + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, ()) +} + /// List all (not deleted) timelines. +/// Note: it is possible to do the same with debug_dump. async fn timeline_list_handler(request: Request) -> Result, ApiError> { check_permission(&request, None)?; let res: Vec = GlobalTimelines::get_all() @@ -174,30 +222,21 @@ async fn timeline_status_handler(request: Request) -> Result) -> Result, ApiError> { - let request_data: TimelineCreateRequest = json_request(&mut request).await?; - - let ttid = TenantTimelineId { - tenant_id: request_data.tenant_id, - timeline_id: request_data.timeline_id, - }; +/// Deactivates the timeline and removes its data directory. +async fn timeline_delete_handler(mut request: Request) -> Result, ApiError> { + let ttid = TenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false); check_permission(&request, Some(ttid.tenant_id))?; - - let server_info = ServerInfo { - pg_version: request_data.pg_version, - system_id: request_data.system_id.unwrap_or(0), - wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32), - }; - let local_start_lsn = request_data.local_start_lsn.unwrap_or_else(|| { - request_data - .commit_lsn - .segment_lsn(server_info.wal_seg_size as usize) - }); - GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn) + ensure_no_body(&mut request).await?; + // FIXME: `delete_force` can fail from both internal errors and bad requests. Add better + // error handling here when we're able to. + let resp = GlobalTimelines::delete(&ttid, only_local) .await .map_err(ApiError::InternalServerError)?; - - json_response(StatusCode::OK, ()) + json_response(StatusCode::OK, resp) } /// Pull timeline from peer safekeeper instances. @@ -279,6 +318,46 @@ async fn timeline_copy_handler(mut request: Request) -> Result, +) -> Result, ApiError> { + check_permission(&request, None)?; + + let ttid = TenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + + let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; + + let patch_request: patch_control_file::Request = json_request(&mut request).await?; + let response = patch_control_file::handle_request(tli, patch_request) + .await + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, response) +} + +/// Force persist control file. +async fn timeline_checkpoint_handler(request: Request) -> Result, ApiError> { + check_permission(&request, None)?; + + let ttid = TenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + + let tli = GlobalTimelines::get(ttid)?; + tli.write_shared_state() + .await + .sk + .state_mut() + .flush() + .await + .map_err(ApiError::InternalServerError)?; + json_response(StatusCode::OK, ()) +} + async fn timeline_digest_handler(request: Request) -> Result, ApiError> { let ttid = TenantTimelineId::new( parse_request_param(&request, "tenant_id")?, @@ -310,64 +389,6 @@ async fn timeline_digest_handler(request: Request) -> Result) -> Result, ApiError> { - check_permission(&request, None)?; - - let ttid = TenantTimelineId::new( - parse_request_param(&request, "tenant_id")?, - parse_request_param(&request, "timeline_id")?, - ); - - let tli = GlobalTimelines::get(ttid)?; - tli.write_shared_state() - .await - .sk - .state_mut() - .flush() - .await - .map_err(ApiError::InternalServerError)?; - json_response(StatusCode::OK, ()) -} - -/// Deactivates the timeline and removes its data directory. -async fn timeline_delete_handler(mut request: Request) -> Result, ApiError> { - let ttid = TenantTimelineId::new( - parse_request_param(&request, "tenant_id")?, - parse_request_param(&request, "timeline_id")?, - ); - let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false); - check_permission(&request, Some(ttid.tenant_id))?; - ensure_no_body(&mut request).await?; - // FIXME: `delete_force` can fail from both internal errors and bad requests. Add better - // error handling here when we're able to. - let resp = GlobalTimelines::delete(&ttid, only_local) - .await - .map_err(ApiError::InternalServerError)?; - json_response(StatusCode::OK, resp) -} - -/// Deactivates all timelines for the tenant and removes its data directory. -/// See `timeline_delete_handler`. -async fn tenant_delete_handler(mut request: Request) -> Result, ApiError> { - let tenant_id = parse_request_param(&request, "tenant_id")?; - let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false); - check_permission(&request, Some(tenant_id))?; - ensure_no_body(&mut request).await?; - // FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons; - // Using an `InternalServerError` should be fixed when the types support it - let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id, only_local) - .await - .map_err(ApiError::InternalServerError)?; - json_response( - StatusCode::OK, - delete_info - .iter() - .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp)) - .collect::>(), - ) -} - /// Used only in tests to hand craft required data. async fn record_safekeeper_info(mut request: Request) -> Result, ApiError> { let ttid = TenantTimelineId::new( @@ -509,26 +530,6 @@ async fn dump_debug_handler(mut request: Request) -> Result Ok(response) } -async fn patch_control_file_handler( - mut request: Request, -) -> Result, ApiError> { - check_permission(&request, None)?; - - let ttid = TenantTimelineId::new( - parse_request_param(&request, "tenant_id")?, - parse_request_param(&request, "timeline_id")?, - ); - - let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; - - let patch_request: patch_control_file::Request = json_request(&mut request).await?; - let response = patch_control_file::handle_request(tli, patch_request) - .await - .map_err(ApiError::InternalServerError)?; - - json_response(StatusCode::OK, response) -} - /// Safekeeper http router. pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder { let mut router = endpoint::make_router(); @@ -568,6 +569,9 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder failpoints_handler(r, cancel).await }) }) + .delete("/v1/tenant/:tenant_id", |r| { + request_span(r, tenant_delete_handler) + }) // Will be used in the future instead of implicit timeline creation .post("/v1/tenant/timeline", |r| { request_span(r, timeline_create_handler) @@ -581,16 +585,13 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| { request_span(r, timeline_delete_handler) }) - .delete("/v1/tenant/:tenant_id", |r| { - request_span(r, tenant_delete_handler) + .post("/v1/pull_timeline", |r| { + request_span(r, timeline_pull_handler) }) .get( "/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id", |r| request_span(r, timeline_snapshot_handler), ) - .post("/v1/pull_timeline", |r| { - request_span(r, timeline_pull_handler) - }) .post( "/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy", |r| request_span(r, timeline_copy_handler), @@ -603,14 +604,13 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder "/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint", |r| request_span(r, timeline_checkpoint_handler), ) - // for tests + .get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| { + request_span(r, timeline_digest_handler) + }) .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| { request_span(r, record_safekeeper_info) }) .get("/v1/debug_dump", |r| request_span(r, dump_debug_handler)) - .get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| { - request_span(r, timeline_digest_handler) - }) } #[cfg(test)] diff --git a/test_runner/fixtures/safekeeper/http.py b/test_runner/fixtures/safekeeper/http.py index dd3a0a3d54..05b43cfb72 100644 --- a/test_runner/fixtures/safekeeper/http.py +++ b/test_runner/fixtures/safekeeper/http.py @@ -65,6 +65,16 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter): def check_status(self): self.get(f"http://localhost:{self.port}/v1/status").raise_for_status() + def get_metrics_str(self) -> str: + """You probably want to use get_metrics() instead.""" + request_result = self.get(f"http://localhost:{self.port}/metrics") + request_result.raise_for_status() + return request_result.text + + def get_metrics(self) -> SafekeeperMetrics: + res = self.get_metrics_str() + return SafekeeperMetrics(parse_metrics(res)) + def is_testing_enabled_or_skip(self): if not self.is_testing_enabled: pytest.skip("safekeeper was built without 'testing' feature") @@ -89,56 +99,8 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter): assert res_json is None return res_json - def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]: - params = params or {} - res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params) - res.raise_for_status() - res_json = json.loads(res.text) - assert isinstance(res_json, dict) - return res_json - - def patch_control_file( - self, - tenant_id: TenantId, - timeline_id: TimelineId, - patch: Dict[str, Any], - ) -> Dict[str, Any]: - res = self.patch( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/control_file", - json={ - "updates": patch, - "apply_fields": list(patch.keys()), - }, - ) - res.raise_for_status() - res_json = res.json() - assert isinstance(res_json, dict) - return res_json - - def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]: - res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body) - res.raise_for_status() - res_json = res.json() - assert isinstance(res_json, dict) - return res_json - - def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: Dict[str, Any]): - res = self.post( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy", - json=body, - ) - res.raise_for_status() - - def timeline_digest( - self, tenant_id: TenantId, timeline_id: TimelineId, from_lsn: Lsn, until_lsn: Lsn - ) -> Dict[str, Any]: - res = self.get( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/digest", - params={ - "from_lsn": str(from_lsn), - "until_lsn": str(until_lsn), - }, - ) + def tenant_delete_force(self, tenant_id: TenantId) -> Dict[Any, Any]: + res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}") res.raise_for_status() res_json = res.json() assert isinstance(res_json, dict) @@ -189,20 +151,6 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter): def get_commit_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn: return self.timeline_status(tenant_id, timeline_id).commit_lsn - def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body): - res = self.post( - f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}", - json=body, - ) - res.raise_for_status() - - def checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId): - res = self.post( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint", - json={}, - ) - res.raise_for_status() - # only_local doesn't remove segments in the remote storage. def timeline_delete( self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False @@ -218,19 +166,71 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter): assert isinstance(res_json, dict) return res_json - def tenant_delete_force(self, tenant_id: TenantId) -> Dict[Any, Any]: - res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}") + def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]: + params = params or {} + res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params) + res.raise_for_status() + res_json = json.loads(res.text) + assert isinstance(res_json, dict) + return res_json + + def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]: + res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body) res.raise_for_status() res_json = res.json() assert isinstance(res_json, dict) return res_json - def get_metrics_str(self) -> str: - """You probably want to use get_metrics() instead.""" - request_result = self.get(f"http://localhost:{self.port}/metrics") - request_result.raise_for_status() - return request_result.text + def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: Dict[str, Any]): + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy", + json=body, + ) + res.raise_for_status() - def get_metrics(self) -> SafekeeperMetrics: - res = self.get_metrics_str() - return SafekeeperMetrics(parse_metrics(res)) + def patch_control_file( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + patch: Dict[str, Any], + ) -> Dict[str, Any]: + res = self.patch( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/control_file", + json={ + "updates": patch, + "apply_fields": list(patch.keys()), + }, + ) + res.raise_for_status() + res_json = res.json() + assert isinstance(res_json, dict) + return res_json + + def checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId): + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint", + json={}, + ) + res.raise_for_status() + + def timeline_digest( + self, tenant_id: TenantId, timeline_id: TimelineId, from_lsn: Lsn, until_lsn: Lsn + ) -> Dict[str, Any]: + res = self.get( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/digest", + params={ + "from_lsn": str(from_lsn), + "until_lsn": str(until_lsn), + }, + ) + res.raise_for_status() + res_json = res.json() + assert isinstance(res_json, dict) + return res_json + + def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body): + res = self.post( + f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}", + json=body, + ) + res.raise_for_status()