From d7d87f344506b1e50ac6c67d8e9e9f0a6037dd3d Mon Sep 17 00:00:00 2001 From: Tyr Chen Date: Sat, 31 May 2025 11:22:33 -0700 Subject: [PATCH] feature: use instructions for rules --- .cursor/rules/rust/core/code-quality.mdc | 261 ++++++++ .cursor/rules/rust/features/axum.mdc | 601 ++++++++++++++++++ .cursor/rules/rust/features/concurrency.mdc | 525 +++++++++++++++ .cursor/rules/rust/features/database.mdc | 428 +++++++++++++ .../rules/rust/features/tools-and-config.mdc | 583 +++++++++++++++++ 5 files changed, 2398 insertions(+) create mode 100644 .cursor/rules/rust/features/concurrency.mdc create mode 100644 .cursor/rules/rust/features/database.mdc create mode 100644 .cursor/rules/rust/features/tools-and-config.mdc diff --git a/.cursor/rules/rust/core/code-quality.mdc b/.cursor/rules/rust/core/code-quality.mdc index b93c988..e63beea 100644 --- a/.cursor/rules/rust/core/code-quality.mdc +++ b/.cursor/rules/rust/core/code-quality.mdc @@ -3,3 +3,264 @@ description: globs: alwaysApply: false --- +# ๐Ÿฆ€ RUST CORE CODE QUALITY STANDARDS + +> **TL;DR:** Essential code quality rules for all Rust projects, focusing on maintainable, production-ready code that follows modern Rust 2024 idioms. + +## ๐ŸŽฏ FUNDAMENTAL PRINCIPLES + +### Code Organization +- **Functionality-based files**: Use meaningful file names like `node.rs`, `workflow.rs`, `execution.rs` instead of generic `models.rs`, `traits.rs`, `types.rs` +- **Meaningful naming**: Avoid names like `WorkflowValidatorImpl` - use descriptive, specific names +- **File size limits**: Maximum 500 lines per file (excluding tests) +- **Function size**: Maximum 150 lines per function +- **Single Responsibility**: Each module should have one clear purpose + +### Rust Edition and Safety +- **Always use Rust 2024 edition** +- **Never use `unsafe` code** - find safe alternatives +- **Production-ready code**: All code must be deployable and maintainable +- **No `unwrap()` or `expect()`** in production code - use proper error handling + +## ๐Ÿ“ฆ DEPENDENCY MANAGEMENT + +### Workspace Dependencies Priority +```toml +# Always prefer workspace dependencies first +[dependencies] +tokio = { workspace = true } +serde = { workspace = true, features = ["derive"] } + +# Only add new dependencies if not available in workspace +# Request permission before modifying Cargo.toml +``` + +### Version Strategy +- **Always use latest versions** when adding new dependencies +- **Request permission** before modifying `Cargo.toml` +- **Check workspace first** - never duplicate dependencies unnecessarily + +## ๐Ÿ—๏ธ CODE STRUCTURE PATTERNS + +### Data Structure Organization +```rust +// โœ… Good: Functionality-based organization +// src/workflow.rs - All workflow-related types and logic +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] // Always use camelCase for JSON serialization +pub struct WorkflowDefinition { + pub workflow_id: String, + pub display_name: String, + pub created_at: DateTime, +} + +// โœ… Good: Meaningful trait names +pub trait WorkflowValidator { + fn validate(&self, workflow: &WorkflowDefinition) -> Result<(), ValidationError>; +} + +// โŒ Bad: Generic file organization +// src/models.rs, src/traits.rs, src/types.rs +// โŒ Bad: Poor naming +// struct WorkflowValidatorImpl +``` + +### Serde Configuration +```rust +// โœ… Always use camelCase for JSON serialization +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ApiResponse { + pub workflow_id: String, + pub created_at: DateTime, + pub is_active: bool, +} + +// This serializes to: +// {"workflowId": "...", "createdAt": "...", "isActive": true} +``` + +## ๐Ÿ”ง BUILD AND QUALITY CHECKS + +### Mandatory Verification Steps +After completing any code changes, **always run in order**: + +```bash +# 1. Build check +cargo build + +# 2. Test execution +cargo test + +# 3. Linting +cargo clippy + +# All must pass before considering code complete +``` + +### Clippy Configuration +```toml +# Cargo.toml +[lints.clippy] +all = "warn" +pedantic = "warn" +nursery = "warn" +unwrap_used = "deny" +expect_used = "deny" +``` + +## ๐Ÿ—‚๏ธ FILE NAMING CONVENTIONS + +### Module Organization Patterns +```rust +// โœ… Good: Feature-based modules +src/ +โ”œโ”€โ”€ workflow/ +โ”‚ โ”œโ”€โ”€ mod.rs +โ”‚ โ”œโ”€โ”€ validator.rs // WorkflowValidator trait and implementations +โ”‚ โ”œโ”€โ”€ executor.rs // WorkflowExecutor logic +โ”‚ โ””โ”€โ”€ definition.rs // WorkflowDefinition types +โ”œโ”€โ”€ node/ +โ”‚ โ”œโ”€โ”€ mod.rs +โ”‚ โ”œโ”€โ”€ registry.rs // NodeRegistry (not NodeTypeRegistry) +โ”‚ โ””โ”€โ”€ executor.rs // Node execution logic +โ””โ”€โ”€ storage/ + โ”œโ”€โ”€ mod.rs + โ”œโ”€โ”€ entities.rs // Database entities + โ””โ”€โ”€ repositories.rs // Data access patterns +``` + +### Naming Best Practices +```rust +// โœ… Good naming examples +pub struct WorkflowValidator; // Clear, specific +pub struct NodeExecutor; // Action-oriented +pub struct DatabaseConnection; // Descriptive + +// โŒ Bad naming examples +pub struct WorkflowValidatorImpl; // Unnecessary "Impl" suffix +pub struct Helper; // Too generic +pub struct Manager; // Vague responsibility +``` + +## ๐Ÿงช TESTING STANDARDS + +### Unit Test Placement +```rust +// โœ… Always place unit tests in the same file +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_workflow_validation() { + let validator = WorkflowValidator::new(); + let workflow = WorkflowDefinition::default(); + assert!(validator.validate(&workflow).is_ok()); + } +} + +// โŒ Don't create separate test files for unit tests +// tests/workflow_test.rs (this is for integration tests only) +``` + +### Test Naming +```rust +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_valid_workflow_passes_validation() { + // Test name clearly describes the scenario + } + + #[test] + fn test_empty_workflow_id_returns_error() { + // Specific about what's being tested + } +} +``` + +## ๐Ÿ“ DOCUMENTATION STANDARDS + +### Code Documentation +```rust +/// Validates workflow definitions according to business rules. +/// +/// # Examples +/// +/// ```rust +/// let validator = WorkflowValidator::new(); +/// let workflow = WorkflowDefinition::builder() +/// .workflow_id("test-workflow") +/// .build(); +/// +/// assert!(validator.validate(&workflow).is_ok()); +/// ``` +/// +/// # Errors +/// +/// Returns `ValidationError` if: +/// - Workflow ID is empty or invalid +/// - Required fields are missing +/// - Business rules are violated +pub struct WorkflowValidator { + rules: Vec, +} +``` + +## ๐Ÿšจ ANTI-PATTERNS TO AVOID + +### Code Organization Anti-Patterns +```rust +// โŒ Don't use generic file names +// src/models.rs - mixing unrelated types +// src/utils.rs - catch-all for random functions +// src/helpers.rs - unclear responsibility + +// โŒ Don't use implementation suffixes +pub struct WorkflowValidatorImpl; +pub struct DatabaseManagerImpl; + +// โŒ Don't mix concerns in single files +// src/app.rs containing database, validation, and HTTP logic + +// โŒ Don't use overly long files +// Any file > 500 lines (excluding tests) needs refactoring +``` + +### Dependency Anti-Patterns +```rust +// โŒ Don't duplicate workspace dependencies +[dependencies] +tokio = "1.0" # Already in workspace + +// โŒ Don't modify Cargo.toml without permission +# Always ask before adding new dependencies + +// โŒ Don't use outdated versions +serde = "0.9" # Use latest stable +``` + +## โœ… QUALITY CHECKLIST + +```markdown +### Code Quality Verification +- [ ] Uses Rust 2024 edition +- [ ] No `unsafe` code blocks +- [ ] No `unwrap()` or `expect()` in production code +- [ ] All data structures use `#[serde(rename_all = "camelCase")]` +- [ ] Files organized by functionality, not type +- [ ] Meaningful names (no "Impl" suffixes) +- [ ] Functions โ‰ค 150 lines +- [ ] Files โ‰ค 500 lines (excluding tests) +- [ ] Unit tests in same file as implementation +- [ ] `cargo build` passes +- [ ] `cargo test` passes +- [ ] `cargo clippy` passes with no warnings +- [ ] Public APIs documented with examples +- [ ] Workspace dependencies used when available +``` + +This code quality standard ensures consistent, maintainable, and production-ready Rust code across all projects. diff --git a/.cursor/rules/rust/features/axum.mdc b/.cursor/rules/rust/features/axum.mdc index b93c988..e4ef820 100644 --- a/.cursor/rules/rust/features/axum.mdc +++ b/.cursor/rules/rust/features/axum.mdc @@ -3,3 +3,604 @@ description: globs: alwaysApply: false --- +# ๐ŸŒ AXUM WEB FRAMEWORK BEST PRACTICES + +> **TL;DR:** Modern async web development with Axum, focusing on structured APIs, proper error handling, and OpenAPI documentation. + +## ๐ŸŽฏ AXUM PROJECT STRUCTURE + +### Router Organization +```rust +use axum::{ + extract::{Path, Query, State}, + http::StatusCode, + response::Json, + routing::{get, post, put, delete}, + Router, +}; +use utoipa::{OpenApi, ToSchema}; +use utoipa_axum::{router::OpenApiRouter, routes}; + +// โœ… Good: Modular router structure +pub fn create_app(state: AppState) -> Router { + let (router, api) = OpenApiRouter::with_openapi(ApiDoc::openapi()) + .routes(routes!(health_check)) + .nest("/api/v1/workflows", workflows::router()) + .nest("/api/v1/nodes", nodes::router()) + .nest("/api/v1/executions", executions::router()) + .with_state(state) + .split_for_parts(); + + router.merge(utoipa_swagger_ui::SwaggerUi::new("/swagger-ui") + .url("/apidoc/openapi.json", api)) +} + +#[derive(OpenApi)] +#[openapi( + tags( + (name = "workflows", description = "Workflow management API"), + (name = "nodes", description = "Node management API"), + (name = "executions", description = "Execution management API") + ) +)] +struct ApiDoc; +``` + +### Application State Pattern +```rust +use sqlx::PgPool; +use std::sync::Arc; + +#[derive(Clone)] +pub struct AppState { + pub db: PgPool, + pub config: Arc, + pub node_registry: Arc, + pub event_bus: Arc, +} + +impl AppState { + pub async fn new(config: AppConfig) -> Result { + let db = create_connection_pool(&config.database.url).await?; + + // Run migrations + sqlx::migrate!("./migrations").run(&db).await?; + + let node_registry = Arc::new(NodeRegistry::new()); + let event_bus = Arc::new(EventBus::new(1000)); + + Ok(Self { + db, + config: Arc::new(config), + node_registry, + event_bus, + }) + } +} +``` + +## ๐Ÿ”ง REQUEST/RESPONSE PATTERNS + +### Path Parameters with Validation +```rust +use axum::extract::Path; +use serde::Deserialize; +use uuid::Uuid; +use utoipa::ToSchema; + +// โœ… Good: Axum 0.8 path parameter syntax +#[utoipa::path( + get, + path = "/api/v1/workflows/{workflow_id}", + params( + ("workflow_id" = Uuid, Path, description = "Workflow unique identifier") + ), + responses( + (status = 200, description = "Workflow found", body = WorkflowResponse), + (status = 404, description = "Workflow not found", body = ErrorResponse) + ), + tag = "workflows" +)] +pub async fn get_workflow( + State(state): State, + Path(workflow_id): Path, // Note: {workflow_id} in route, not :workflow_id +) -> Result, AppError> { + let workflow = state + .workflow_service + .get_by_id(workflow_id) + .await? + .ok_or(AppError::NotFound("Workflow not found".to_string()))?; + + Ok(Json(WorkflowResponse::from(workflow))) +} + +// โŒ Wrong: Axum 0.7 syntax (outdated) +// Path("/api/v1/workflows/:workflow_id") // Don't use :workflow_id +``` + +### Query Parameters with Defaults +```rust +use serde::Deserialize; + +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ListQuery { + #[serde(default = "default_limit")] + pub limit: i64, + #[serde(default)] + pub offset: i64, + #[serde(default)] + pub sort_by: Option, + #[serde(default)] + pub sort_order: Option, +} + +fn default_limit() -> i64 { + 20 +} + +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "lowercase")] +pub enum SortOrder { + Asc, + Desc, +} + +#[utoipa::path( + get, + path = "/api/v1/workflows", + params(ListQuery), + responses( + (status = 200, description = "List of workflows", body = WorkflowListResponse) + ), + tag = "workflows" +)] +pub async fn list_workflows( + State(state): State, + Query(query): Query, +) -> Result, AppError> { + let workflows = state + .workflow_service + .list(query.limit, query.offset, query.sort_by, query.sort_order) + .await?; + + Ok(Json(WorkflowListResponse { workflows })) +} +``` + +### JSON Request/Response with Validation +```rust +use axum::Json; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; +use validator::Validate; + +#[derive(Debug, Deserialize, Validate, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CreateWorkflowRequest { + #[validate(length(min = 1, max = 100))] + pub name: String, + + #[validate(length(max = 500))] + pub description: Option, + + #[serde(default)] + pub workflow_data: serde_json::Value, + + #[serde(default)] + pub is_active: bool, +} + +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct WorkflowResponse { + pub id: Uuid, + pub name: String, + pub description: Option, + pub workflow_data: serde_json::Value, + pub is_active: bool, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[utoipa::path( + post, + path = "/api/v1/workflows", + request_body = CreateWorkflowRequest, + responses( + (status = 201, description = "Workflow created", body = WorkflowResponse), + (status = 400, description = "Invalid request", body = ErrorResponse), + (status = 422, description = "Validation error", body = ValidationErrorResponse) + ), + tag = "workflows" +)] +pub async fn create_workflow( + State(state): State, + Json(request): Json, +) -> Result<(StatusCode, Json), AppError> { + // Validate request + request.validate() + .map_err(|e| AppError::Validation(e.to_string()))?; + + let workflow = state + .workflow_service + .create(request) + .await?; + + Ok((StatusCode::CREATED, Json(WorkflowResponse::from(workflow)))) +} +``` + +## ๐Ÿšจ ERROR HANDLING + +### Centralized Error Types +```rust +use axum::{ + http::StatusCode, + response::{IntoResponse, Response}, + Json, +}; +use serde::Serialize; +use thiserror::Error; +use utoipa::ToSchema; + +#[derive(Error, Debug)] +pub enum AppError { + #[error("Database error: {0}")] + Database(#[from] sqlx::Error), + + #[error("Validation error: {0}")] + Validation(String), + + #[error("Not found: {0}")] + NotFound(String), + + #[error("Unauthorized")] + Unauthorized, + + #[error("Forbidden")] + Forbidden, + + #[error("Internal server error: {0}")] + Internal(String), + + #[error("Bad request: {0}")] + BadRequest(String), +} + +#[derive(Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ErrorResponse { + pub error: String, + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub details: Option, +} + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + let (status, error_message) = match &self { + AppError::Database(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Database error"), + AppError::Validation(_) => (StatusCode::UNPROCESSABLE_ENTITY, "Validation error"), + AppError::NotFound(_) => (StatusCode::NOT_FOUND, "Not found"), + AppError::Unauthorized => (StatusCode::UNAUTHORIZED, "Unauthorized"), + AppError::Forbidden => (StatusCode::FORBIDDEN, "Forbidden"), + AppError::Internal(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Internal server error"), + AppError::BadRequest(_) => (StatusCode::BAD_REQUEST, "Bad request"), + }; + + let error_response = ErrorResponse { + error: error_message.to_string(), + message: self.to_string(), + details: None, + }; + + (status, Json(error_response)).into_response() + } +} +``` + +### Request Validation Middleware +```rust +use axum::{ + extract::Request, + middleware::Next, + response::Response, +}; +use tower::ServiceBuilder; + +pub fn create_middleware_stack() -> ServiceBuilder< + tower::ServiceBuilder< + tower::ServiceBuilder< + tower::ServiceBuilder + > + > +> { + ServiceBuilder::new() + .layer(tower_http::cors::CorsLayer::permissive()) + .layer(tower_http::trace::TraceLayer::new_for_http()) + .layer(axum::middleware::from_fn(request_id_middleware)) + .layer(tower_http::timeout::TimeoutLayer::new( + std::time::Duration::from_secs(30) + )) +} + +pub async fn request_id_middleware( + mut request: Request, + next: Next, +) -> Result { + let request_id = uuid::Uuid::new_v4().to_string(); + request.headers_mut().insert( + "X-Request-ID", + request_id.parse().unwrap(), + ); + + Ok(next.run(request).await) +} +``` + +## ๐Ÿ“š OPENAPI DOCUMENTATION + +### Complete API Documentation +```rust +use utoipa::{OpenApi, ToSchema}; +use utoipa_axum::router::OpenApiRouter; + +#[derive(OpenApi)] +#[openapi( + info( + title = "Workflow Engine API", + version = "1.0.0", + description = "A comprehensive workflow management system", + contact( + name = "API Support", + email = "support@example.com" + ) + ), + paths( + // Workflow endpoints + workflows::get_workflow, + workflows::list_workflows, + workflows::create_workflow, + workflows::update_workflow, + workflows::delete_workflow, + workflows::execute_workflow, + + // Node endpoints + nodes::list_node_types, + nodes::get_node_schema, + + // Execution endpoints + executions::get_execution, + executions::list_executions, + ), + components( + schemas( + WorkflowResponse, + CreateWorkflowRequest, + UpdateWorkflowRequest, + WorkflowListResponse, + NodeTypeInfo, + ExecutionResponse, + ErrorResponse, + ) + ), + tags( + (name = "workflows", description = "Workflow management operations"), + (name = "nodes", description = "Node type information and schemas"), + (name = "executions", description = "Workflow execution tracking") + ) +)] +pub struct ApiDoc; + +// โœ… Good: Detailed endpoint documentation +#[utoipa::path( + put, + path = "/api/v1/workflows/{workflow_id}", + params( + ("workflow_id" = Uuid, Path, description = "Workflow unique identifier") + ), + request_body( + content = UpdateWorkflowRequest, + description = "Workflow update data", + content_type = "application/json" + ), + responses( + (status = 200, description = "Workflow updated successfully", body = WorkflowResponse), + (status = 400, description = "Invalid request data", body = ErrorResponse), + (status = 404, description = "Workflow not found", body = ErrorResponse), + (status = 422, description = "Validation failed", body = ErrorResponse) + ), + tag = "workflows", + summary = "Update workflow", + description = "Updates an existing workflow with new data. Only provided fields will be updated." +)] +pub async fn update_workflow( + State(state): State, + Path(workflow_id): Path, + Json(request): Json, +) -> Result, AppError> { + // Implementation + todo!() +} +``` + +## ๐Ÿงช TESTING AXUM HANDLERS + +### Integration Testing +```rust +#[cfg(test)] +mod tests { + use super::*; + use axum::{ + body::Body, + http::{Request, StatusCode}, + }; + use serde_json::json; + use tower::ServiceExt; + + async fn setup_test_app() -> Router { + let state = AppState::new_test().await; + create_app(state) + } + + #[tokio::test] + async fn test_create_workflow_success() { + let app = setup_test_app().await; + + let request_body = json!({ + "name": "Test Workflow", + "description": "A test workflow", + "workflowData": {}, + "isActive": true + }); + + let request = Request::builder() + .method("POST") + .uri("/api/v1/workflows") + .header("Content-Type", "application/json") + .body(Body::from(request_body.to_string())) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + assert_eq!(response.status(), StatusCode::CREATED); + + let body = hyper::body::to_bytes(response.into_body()).await.unwrap(); + let workflow: WorkflowResponse = serde_json::from_slice(&body).unwrap(); + assert_eq!(workflow.name, "Test Workflow"); + } + + #[tokio::test] + async fn test_get_workflow_not_found() { + let app = setup_test_app().await; + let non_existent_id = Uuid::new_v4(); + + let request = Request::builder() + .method("GET") + .uri(&format!("/api/v1/workflows/{}", non_existent_id)) + .body(Body::empty()) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); + } + + #[tokio::test] + async fn test_validation_error() { + let app = setup_test_app().await; + + let invalid_request = json!({ + "name": "", // Empty name should fail validation + "description": "Valid description" + }); + + let request = Request::builder() + .method("POST") + .uri("/api/v1/workflows") + .header("Content-Type", "application/json") + .body(Body::from(invalid_request.to_string())) + .unwrap(); + + let response = app.oneshot(request).await.unwrap(); + assert_eq!(response.status(), StatusCode::UNPROCESSABLE_ENTITY); + } +} +``` + +## ๐Ÿ”ง SERVER CONFIGURATION + +### Production Server Setup +```rust +use axum::serve; +use std::net::SocketAddr; +use tokio::net::TcpListener; +use tower_http::trace::TraceLayer; +use tracing::{info, error}; + +pub async fn start_server(config: AppConfig) -> Result<(), Box> { + let state = AppState::new(config.clone()).await?; + let app = create_app(state) + .layer(create_middleware_stack()); + + let addr = SocketAddr::from(([0, 0, 0, 0], config.server.port)); + let listener = TcpListener::bind(addr).await?; + + info!("Server starting on {}", addr); + + serve(listener, app) + .await + .map_err(|e| { + error!("Server error: {}", e); + e.into() + }) +} + +// Graceful shutdown handling +pub async fn start_server_with_shutdown( + config: AppConfig, + shutdown_signal: impl std::future::Future + Send + 'static, +) -> Result<(), Box> { + let state = AppState::new(config.clone()).await?; + let app = create_app(state) + .layer(create_middleware_stack()); + + let addr = SocketAddr::from(([0, 0, 0, 0], config.server.port)); + let listener = TcpListener::bind(addr).await?; + + info!("Server starting on {}", addr); + + serve(listener, app) + .with_graceful_shutdown(shutdown_signal) + .await + .map_err(|e| { + error!("Server error: {}", e); + e.into() + }) +} +``` + +## ๐Ÿšจ AXUM ANTI-PATTERNS + +### What to Avoid +```rust +// โŒ Don't use old Axum 0.7 path syntax +// #[utoipa::path(get, path = "/workflows/:id")] // Use {id} instead + +// โŒ Don't forget Content-Type header for JSON endpoints +// Missing: .header("Content-Type", "application/json") + +// โŒ Don't ignore validation +// pub async fn create_user(Json(request): Json) { +// // Missing: request.validate()? +// } + +// โŒ Don't use std::sync in Axum handlers +// pub async fn bad_handler(State(state): State>>) { +// let data = state.lock().unwrap(); // Blocks async runtime +// } + +// โŒ Don't return bare strings for JSON APIs +// pub async fn bad_endpoint() -> String { +// "success".to_string() // Should return Json +// } +``` + +## โœ… AXUM CHECKLIST + +```markdown +### Axum Implementation Verification +- [ ] Uses Axum 0.8 path syntax with {param} not :param +- [ ] All JSON structs use #[serde(rename_all = "camelCase")] +- [ ] Comprehensive OpenAPI documentation with utoipa +- [ ] Proper error handling with IntoResponse trait +- [ ] Request validation with validator crate +- [ ] Structured application state with Clone trait +- [ ] Integration tests for all endpoints +- [ ] Middleware stack includes CORS, tracing, timeouts +- [ ] Graceful shutdown implementation +- [ ] No blocking operations in async handlers +- [ ] Content-Type headers properly set +- [ ] HTTP status codes correctly used +``` + +This Axum standard ensures robust, well-documented, and maintainable web APIs following modern Rust async patterns. diff --git a/.cursor/rules/rust/features/concurrency.mdc b/.cursor/rules/rust/features/concurrency.mdc new file mode 100644 index 0000000..56fb3ac --- /dev/null +++ b/.cursor/rules/rust/features/concurrency.mdc @@ -0,0 +1,525 @@ +--- +description: +globs: +alwaysApply: false +--- +# โšก RUST CONCURRENCY BEST PRACTICES + +> **TL;DR:** Modern async/await patterns and thread-safe data structures for high-performance Rust applications. + +## ๐ŸŽฏ ASYNC RUNTIME SELECTION + +### Tokio as the Standard +- **Always use Tokio** for async runtime +- **Use tokio::sync primitives** instead of std::sync for async code +- **Leverage async/await patterns** throughout the application +- **Avoid blocking operations** in async contexts + +```toml +# Cargo.toml - Tokio configuration +[dependencies] +tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync", "time", "fs"] } +``` + +## ๐Ÿ”’ SYNCHRONIZATION PRIMITIVES + +### Tokio Sync Over Std Sync +```rust +// โœ… Preferred: Use tokio synchronization primitives +use tokio::sync::{RwLock, Mutex, broadcast, mpsc, oneshot}; +use std::sync::Arc; + +// โœ… Good: Async-friendly RwLock +pub struct WorkflowCache { + data: Arc>>, +} + +impl WorkflowCache { + pub fn new() -> Self { + Self { + data: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub async fn get(&self, id: &str) -> Option { + let data = self.data.read().await; + data.get(id).cloned() + } + + pub async fn insert(&self, id: String, workflow: WorkflowDefinition) { + let mut data = self.data.write().await; + data.insert(id, workflow); + } + + pub async fn remove(&self, id: &str) -> Option { + let mut data = self.data.write().await; + data.remove(id) + } +} + +// โŒ Avoid: std::sync in async contexts +// use std::sync::{RwLock, Mutex}; // Blocks async runtime +// use parking_lot::{RwLock, Mutex}; // Also blocking +``` + +### DashMap for Concurrent Collections +```rust +use dashmap::DashMap; +use std::sync::Arc; + +// โœ… Preferred: DashMap for concurrent hash maps +pub struct NodeRegistry { + nodes: Arc>>, + categories: Arc>>, +} + +impl NodeRegistry { + pub fn new() -> Self { + Self { + nodes: Arc::new(DashMap::new()), + categories: Arc::new(DashMap::new()), + } + } + + pub fn register_node(&self, id: String, node: Box) { + let category = node.category().to_string(); + + // Insert the node + self.nodes.insert(id.clone(), node); + + // Update category index + self.categories + .entry(category) + .or_insert_with(Vec::new) + .push(id); + } + + pub fn get_node(&self, id: &str) -> Option>> { + self.nodes.get(id) + } + + pub fn list_by_category(&self, category: &str) -> Vec { + self.categories + .get(category) + .map(|entry| entry.clone()) + .unwrap_or_default() + } + + pub fn list_all_nodes(&self) -> Vec { + self.nodes.iter().map(|entry| entry.key().clone()).collect() + } +} + +// โŒ Avoid: Mutex for concurrent access +// pub struct BadNodeRegistry { +// nodes: Arc>>> +// } +``` + +## ๐Ÿ“ก CHANNEL PATTERNS + +### Multi-Producer Single-Consumer (MPSC) +```rust +use tokio::sync::mpsc; +use tracing::{info, error}; + +pub struct EventProcessor { + sender: mpsc::UnboundedSender, +} + +impl EventProcessor { + pub fn new() -> (Self, EventProcessorHandle) { + let (tx, rx) = mpsc::unbounded_channel(); + + let handle = EventProcessorHandle::new(rx); + let processor = Self { sender: tx }; + + (processor, handle) + } + + pub fn send_event(&self, event: WorkflowEvent) -> Result<(), mpsc::error::SendError> { + self.sender.send(event) + } +} + +pub struct EventProcessorHandle { + receiver: mpsc::UnboundedReceiver, +} + +impl EventProcessorHandle { + fn new(receiver: mpsc::UnboundedReceiver) -> Self { + Self { receiver } + } + + pub async fn run(mut self) { + while let Some(event) = self.receiver.recv().await { + if let Err(e) = self.process_event(event).await { + error!("Failed to process event: {}", e); + } + } + info!("Event processor stopped"); + } + + async fn process_event(&self, event: WorkflowEvent) -> Result<(), ProcessingError> { + match event { + WorkflowEvent::Started { workflow_id, .. } => { + info!("Workflow {} started", workflow_id); + // Process workflow start + } + WorkflowEvent::Completed { workflow_id, .. } => { + info!("Workflow {} completed", workflow_id); + // Process workflow completion + } + WorkflowEvent::Failed { workflow_id, error, .. } => { + error!("Workflow {} failed: {}", workflow_id, error); + // Process workflow failure + } + } + Ok(()) + } +} +``` + +### Broadcast for Multiple Subscribers +```rust +use tokio::sync::broadcast; + +pub struct EventBus { + sender: broadcast::Sender, +} + +impl EventBus { + pub fn new(capacity: usize) -> Self { + let (sender, _) = broadcast::channel(capacity); + Self { sender } + } + + pub fn publish(&self, event: SystemEvent) -> Result> { + self.sender.send(event) + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.sender.subscribe() + } +} + +// Usage example +pub async fn start_event_monitoring(event_bus: Arc) { + let mut receiver = event_bus.subscribe(); + + tokio::spawn(async move { + while let Ok(event) = receiver.recv().await { + match event { + SystemEvent::NodeExecutionStarted { node_id, .. } => { + info!("Node {} started execution", node_id); + } + SystemEvent::NodeExecutionCompleted { node_id, .. } => { + info!("Node {} completed execution", node_id); + } + SystemEvent::SystemShutdown => { + info!("System shutdown requested"); + break; + } + } + } + }); +} +``` + +### Oneshot for Single Response +```rust +use tokio::sync::oneshot; + +pub struct AsyncValidator { + // Internal state +} + +impl AsyncValidator { + pub async fn validate_workflow(&self, workflow: WorkflowDefinition) -> Result { + let (tx, rx) = oneshot::channel(); + + // Spawn validation task + let workflow_clone = workflow.clone(); + tokio::spawn(async move { + let result = perform_validation(workflow_clone).await; + let _ = tx.send(result); + }); + + // Wait for validation result + rx.await + .map_err(|_| ValidationError::Internal("Validation task cancelled".to_string()))? + } +} + +async fn perform_validation(workflow: WorkflowDefinition) -> Result { + // Expensive validation logic + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + if workflow.nodes.is_empty() { + return Err(ValidationError::EmptyWorkflow); + } + + Ok(ValidationResult::Valid) +} +``` + +## ๐Ÿƒ TASK MANAGEMENT + +### Structured Concurrency with JoinSet +```rust +use tokio::task::JoinSet; +use std::collections::HashMap; + +pub struct WorkflowExecutor { + // Internal state +} + +impl WorkflowExecutor { + pub async fn execute_workflow_parallel(&self, workflow: &WorkflowDefinition) -> Result { + let mut join_set = JoinSet::new(); + let mut results = HashMap::new(); + + // Execute nodes in parallel where possible + for node in &workflow.nodes { + if self.can_execute_parallel(node, &results) { + let node_clone = node.clone(); + let executor = self.clone(); + + join_set.spawn(async move { + let result = executor.execute_node(&node_clone).await; + (node_clone.id.clone(), result) + }); + } + } + + // Collect results + while let Some(result) = join_set.join_next().await { + match result { + Ok((node_id, execution_result)) => { + results.insert(node_id, execution_result?); + } + Err(join_error) => { + return Err(ExecutionError::TaskFailed(join_error.to_string())); + } + } + } + + Ok(ExecutionResult { node_results: results }) + } + + fn can_execute_parallel(&self, node: &NodeDefinition, completed_results: &HashMap) -> bool { + // Check if all dependencies are satisfied + node.dependencies.iter().all(|dep| completed_results.contains_key(dep)) + } +} +``` + +### Graceful Shutdown Pattern +```rust +use tokio::sync::broadcast; +use tokio_util::sync::CancellationToken; + +pub struct Application { + shutdown_token: CancellationToken, + tasks: Vec>, +} + +impl Application { + pub fn new() -> Self { + Self { + shutdown_token: CancellationToken::new(), + tasks: Vec::new(), + } + } + + pub async fn start(&mut self) -> Result<(), ApplicationError> { + // Start background services + self.start_workflow_executor().await?; + self.start_event_processor().await?; + self.start_health_monitor().await?; + + // Wait for shutdown signal + self.wait_for_shutdown().await; + + // Graceful shutdown + self.shutdown_gracefully().await + } + + async fn start_workflow_executor(&mut self) -> Result<(), ApplicationError> { + let token = self.shutdown_token.clone(); + + let handle = tokio::spawn(async move { + loop { + tokio::select! { + _ = token.cancelled() => { + info!("Workflow executor shutdown requested"); + break; + } + _ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => { + // Process workflows + } + } + } + }); + + self.tasks.push(handle); + Ok(()) + } + + async fn wait_for_shutdown(&self) { + // Listen for shutdown signals + let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap(); + let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()).unwrap(); + + tokio::select! { + _ = sigterm.recv() => info!("Received SIGTERM"), + _ = sigint.recv() => info!("Received SIGINT"), + } + + self.shutdown_token.cancel(); + } + + async fn shutdown_gracefully(&mut self) -> Result<(), ApplicationError> { + info!("Starting graceful shutdown"); + + // Wait for all tasks to complete with timeout + let shutdown_timeout = tokio::time::Duration::from_secs(30); + + tokio::time::timeout(shutdown_timeout, async { + for handle in self.tasks.drain(..) { + if let Err(e) = handle.await { + error!("Task failed during shutdown: {}", e); + } + } + }).await.map_err(|_| ApplicationError::ShutdownTimeout)?; + + info!("Graceful shutdown completed"); + Ok(()) + } +} +``` + +## ๐Ÿงช TESTING CONCURRENT CODE + +### Testing Async Functions +```rust +#[cfg(test)] +mod tests { + use super::*; + use tokio::time::{timeout, Duration}; + + #[tokio::test] + async fn test_workflow_cache_concurrent_access() { + let cache = WorkflowCache::new(); + let workflow = WorkflowDefinition::default(); + + // Test concurrent insertions + let mut handles = Vec::new(); + + for i in 0..10 { + let cache_clone = cache.clone(); + let workflow_clone = workflow.clone(); + + handles.push(tokio::spawn(async move { + cache_clone.insert(format!("workflow_{}", i), workflow_clone).await; + })); + } + + // Wait for all insertions + for handle in handles { + handle.await.unwrap(); + } + + // Verify all workflows were inserted + for i in 0..10 { + let result = cache.get(&format!("workflow_{}", i)).await; + assert!(result.is_some()); + } + } + + #[tokio::test] + async fn test_event_processor_with_timeout() { + let (processor, handle) = EventProcessor::new(); + + // Start processor in background + let processor_task = tokio::spawn(handle.run()); + + // Send test events + let event = WorkflowEvent::Started { + workflow_id: "test-workflow".to_string(), + timestamp: Utc::now(), + }; + + processor.send_event(event).unwrap(); + + // Test with timeout to prevent hanging + let result = timeout(Duration::from_secs(5), async { + // Give processor time to handle event + tokio::time::sleep(Duration::from_millis(100)).await; + }).await; + + assert!(result.is_ok()); + + // Cleanup + drop(processor); + let _ = timeout(Duration::from_secs(1), processor_task).await; + } +} +``` + +## ๐Ÿšจ CONCURRENCY ANTI-PATTERNS + +### What to Avoid +```rust +// โŒ Don't use std::sync in async contexts +// use std::sync::{Mutex, RwLock}; +// +// struct BadAsyncCache { +// data: std::sync::RwLock>, // Blocks async runtime +// } + +// โŒ Don't use parking_lot in async code +// use parking_lot::{RwLock, Mutex}; +// +// struct AlsoBadAsyncCache { +// data: parking_lot::RwLock>, // Also blocks +// } + +// โŒ Don't use Mutex for concurrent collections +// struct BadRegistry { +// data: Arc>>, // Use DashMap instead +// } + +// โŒ Don't forget to handle cancellation in long-running tasks +// tokio::spawn(async { +// loop { +// // This loop never checks for cancellation +// process_data().await; +// } +// }); + +// โŒ Don't block the async runtime +// async fn bad_function() { +// std::thread::sleep(Duration::from_secs(1)); // Blocks entire runtime +// } +``` + +## โœ… CONCURRENCY CHECKLIST + +```markdown +### Concurrency Implementation Verification +- [ ] Uses tokio::sync primitives (not std::sync or parking_lot) +- [ ] DashMap used for concurrent collections instead of Mutex +- [ ] All long-running tasks support cancellation +- [ ] No blocking operations in async contexts +- [ ] Proper error handling in concurrent code +- [ ] Graceful shutdown implemented +- [ ] Tests include concurrent access scenarios +- [ ] Structured concurrency with JoinSet for parallel tasks +- [ ] Appropriate channel types used (mpsc, broadcast, oneshot) +- [ ] All async functions properly awaited +- [ ] No unwrap/expect in concurrent code +- [ ] Timeouts used for potentially hanging operations +``` + +This concurrency standard ensures safe, efficient, and maintainable concurrent Rust applications using modern async/await patterns. diff --git a/.cursor/rules/rust/features/database.mdc b/.cursor/rules/rust/features/database.mdc new file mode 100644 index 0000000..ed02956 --- /dev/null +++ b/.cursor/rules/rust/features/database.mdc @@ -0,0 +1,428 @@ +--- +description: +globs: +alwaysApply: false +--- +# ๐Ÿ—„๏ธ RUST DATABASE BEST PRACTICES + +> **TL;DR:** Comprehensive guidelines for database access in Rust using SQLx, focusing on type safety, async patterns, and testing strategies. + +## ๐ŸŽฏ DATABASE LIBRARY SELECTION + +### SQLx as the Standard +- **Always use SQLx** - avoid `rusqlite`, `tokio-postgres`, or other lower-level libraries +- **Support multiple databases** through SQLx's unified interface +- **Leverage compile-time query checking** when possible +- **Use async/await patterns** for all database operations + +```toml +# Cargo.toml - SQLx configuration +[dependencies] +sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres", "sqlite", "uuid", "chrono", "json"] } +``` + +## ๐Ÿ”ง QUERY PATTERNS + +### Use sqlx::query_as Instead of Macros +```rust +// โœ… Preferred: Use sqlx::query_as with custom types +#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WorkflowExecution { + pub id: Uuid, + pub workflow_id: String, + pub status: ExecutionStatus, + pub created_at: DateTime, + pub updated_at: DateTime, + pub metadata: Option, +} + +impl WorkflowExecution { + pub async fn find_by_id( + pool: &PgPool, + id: Uuid + ) -> Result, sqlx::Error> { + sqlx::query_as::<_, WorkflowExecution>( + "SELECT id, workflow_id, status, created_at, updated_at, metadata + FROM workflow_executions + WHERE id = $1" + ) + .bind(id) + .fetch_optional(pool) + .await + } + + pub async fn list_by_workflow( + pool: &PgPool, + workflow_id: &str, + limit: i64, + offset: i64, + ) -> Result, sqlx::Error> { + sqlx::query_as::<_, WorkflowExecution>( + "SELECT id, workflow_id, status, created_at, updated_at, metadata + FROM workflow_executions + WHERE workflow_id = $1 + ORDER BY created_at DESC + LIMIT $2 OFFSET $3" + ) + .bind(workflow_id) + .bind(limit) + .bind(offset) + .fetch_all(pool) + .await + } +} + +// โŒ Avoid: sqlx::query! macro (compile-time dependencies) +// let result = sqlx::query!("SELECT * FROM users WHERE id = $1", user_id) +// .fetch_one(pool) +// .await?; +``` + +### Entity Definition Patterns +```rust +use sqlx::{FromRow, PgPool, Row}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; +use chrono::{DateTime, Utc}; + +#[derive(Debug, Clone, FromRow, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct User { + pub id: Uuid, + pub username: String, + pub email: String, + pub created_at: DateTime, + pub updated_at: DateTime, + pub is_active: bool, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateUserRequest { + pub username: String, + pub email: String, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UpdateUserRequest { + pub username: Option, + pub email: Option, + pub is_active: Option, +} +``` + +## ๐Ÿ—๏ธ REPOSITORY PATTERN + +### Repository Implementation +```rust +use async_trait::async_trait; +use sqlx::PgPool; +use uuid::Uuid; + +#[async_trait] +pub trait UserRepository { + async fn create(&self, request: CreateUserRequest) -> Result; + async fn find_by_id(&self, id: Uuid) -> Result, sqlx::Error>; + async fn find_by_email(&self, email: &str) -> Result, sqlx::Error>; + async fn update(&self, id: Uuid, request: UpdateUserRequest) -> Result, sqlx::Error>; + async fn delete(&self, id: Uuid) -> Result; + async fn list(&self, limit: i64, offset: i64) -> Result, sqlx::Error>; +} + +pub struct PostgresUserRepository { + pool: PgPool, +} + +impl PostgresUserRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl UserRepository for PostgresUserRepository { + async fn create(&self, request: CreateUserRequest) -> Result { + let id = Uuid::new_v4(); + let now = Utc::now(); + + sqlx::query_as::<_, User>( + "INSERT INTO users (id, username, email, created_at, updated_at, is_active) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING id, username, email, created_at, updated_at, is_active" + ) + .bind(id) + .bind(&request.username) + .bind(&request.email) + .bind(now) + .bind(now) + .bind(true) + .fetch_one(&self.pool) + .await + } + + async fn find_by_id(&self, id: Uuid) -> Result, sqlx::Error> { + sqlx::query_as::<_, User>( + "SELECT id, username, email, created_at, updated_at, is_active + FROM users WHERE id = $1" + ) + .bind(id) + .fetch_optional(&self.pool) + .await + } + + async fn find_by_email(&self, email: &str) -> Result, sqlx::Error> { + sqlx::query_as::<_, User>( + "SELECT id, username, email, created_at, updated_at, is_active + FROM users WHERE email = $1" + ) + .bind(email) + .fetch_optional(&self.pool) + .await + } + + async fn update(&self, id: Uuid, request: UpdateUserRequest) -> Result, sqlx::Error> { + let now = Utc::now(); + + sqlx::query_as::<_, User>( + "UPDATE users + SET username = COALESCE($2, username), + email = COALESCE($3, email), + is_active = COALESCE($4, is_active), + updated_at = $5 + WHERE id = $1 + RETURNING id, username, email, created_at, updated_at, is_active" + ) + .bind(id) + .bind(request.username) + .bind(request.email) + .bind(request.is_active) + .bind(now) + .fetch_optional(&self.pool) + .await + } + + async fn delete(&self, id: Uuid) -> Result { + let result = sqlx::query("DELETE FROM users WHERE id = $1") + .bind(id) + .execute(&self.pool) + .await?; + + Ok(result.rows_affected() > 0) + } + + async fn list(&self, limit: i64, offset: i64) -> Result, sqlx::Error> { + sqlx::query_as::<_, User>( + "SELECT id, username, email, created_at, updated_at, is_active + FROM users + ORDER BY created_at DESC + LIMIT $1 OFFSET $2" + ) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await + } +} +``` + +## ๐Ÿงช DATABASE TESTING + +### Using sqlx-db-tester for Tests +```rust +#[cfg(test)] +mod tests { + use super::*; + use sqlx_db_tester::TestPg; + + async fn setup_test_db() -> PgPool { + let tester = TestPg::new( + "postgres://postgres:password@localhost/test".to_string(), + std::path::Path::new("./migrations"), + ); + let pool = tester.get_pool().await; + + // Run migrations + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + + pool + } + + #[tokio::test] + async fn test_create_user() { + let pool = setup_test_db().await; + let repo = PostgresUserRepository::new(pool); + + let request = CreateUserRequest { + username: "testuser".to_string(), + email: "test@example.com".to_string(), + }; + + let user = repo.create(request).await.unwrap(); + assert_eq!(user.username, "testuser"); + assert_eq!(user.email, "test@example.com"); + assert!(user.is_active); + } + + #[tokio::test] + async fn test_find_user_by_email() { + let pool = setup_test_db().await; + let repo = PostgresUserRepository::new(pool); + + // Create a user first + let create_request = CreateUserRequest { + username: "findme".to_string(), + email: "findme@example.com".to_string(), + }; + let created_user = repo.create(create_request).await.unwrap(); + + // Find by email + let found_user = repo.find_by_email("findme@example.com").await.unwrap(); + assert!(found_user.is_some()); + assert_eq!(found_user.unwrap().id, created_user.id); + } + + #[tokio::test] + async fn test_update_user() { + let pool = setup_test_db().await; + let repo = PostgresUserRepository::new(pool); + + // Create a user + let create_request = CreateUserRequest { + username: "updateme".to_string(), + email: "updateme@example.com".to_string(), + }; + let user = repo.create(create_request).await.unwrap(); + + // Update the user + let update_request = UpdateUserRequest { + username: Some("updated_name".to_string()), + email: None, + is_active: Some(false), + }; + + let updated_user = repo.update(user.id, update_request).await.unwrap(); + assert!(updated_user.is_some()); + let updated_user = updated_user.unwrap(); + assert_eq!(updated_user.username, "updated_name"); + assert!(!updated_user.is_active); + } +} +``` + +## ๐Ÿ“‹ MIGRATION PATTERNS + +### Migration File Structure +```sql +-- migrations/20240501000001_create_users_table.sql +CREATE TABLE users ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + username VARCHAR(255) NOT NULL UNIQUE, + email VARCHAR(255) NOT NULL UNIQUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + is_active BOOLEAN NOT NULL DEFAULT true +); + +CREATE INDEX idx_users_email ON users(email); +CREATE INDEX idx_users_username ON users(username); +CREATE INDEX idx_users_created_at ON users(created_at); + +-- Add trigger for updated_at +CREATE OR REPLACE FUNCTION update_updated_at_column() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = NOW(); + RETURN NEW; +END; +$$ language 'plpgsql'; + +CREATE TRIGGER update_users_updated_at + BEFORE UPDATE ON users + FOR EACH ROW + EXECUTE FUNCTION update_updated_at_column(); +``` + +## ๐Ÿ”ง CONNECTION MANAGEMENT + +### Database Pool Configuration +```rust +use sqlx::{postgres::PgPoolOptions, PgPool}; +use std::time::Duration; + +pub async fn create_connection_pool(database_url: &str) -> Result { + PgPoolOptions::new() + .max_connections(20) + .min_connections(5) + .acquire_timeout(Duration::from_secs(30)) + .idle_timeout(Duration::from_secs(600)) + .max_lifetime(Duration::from_secs(1800)) + .connect(database_url) + .await +} + +// Application setup +pub async fn setup_database(config: &DatabaseConfig) -> Result> { + let pool = create_connection_pool(&config.url).await?; + + // Run migrations + sqlx::migrate!("./migrations").run(&pool).await?; + + Ok(pool) +} +``` + +## ๐Ÿšจ ANTI-PATTERNS TO AVOID + +### Database Anti-Patterns +```rust +// โŒ Don't use rusqlite or tokio-postgres directly +// use rusqlite::{Connection, Result}; +// use tokio_postgres::{NoTls, connect}; + +// โŒ Don't use sqlx::query! macro in production +// let user = sqlx::query!("SELECT * FROM users WHERE id = $1", id) +// .fetch_one(pool) +// .await?; + +// โŒ Don't create entities without FromRow +// struct User { +// id: Uuid, +// name: String, +// } +// // Missing: #[derive(FromRow)] + +// โŒ Don't forget serde camelCase configuration +// #[derive(Serialize, Deserialize)] +// struct User { // Missing: #[serde(rename_all = "camelCase")] +// user_id: String, +// } + +// โŒ Don't create repositories without async traits +// impl UserRepository { +// fn find_by_id(&self, id: Uuid) -> User { // Should be async +// // ... +// } +// } +``` + +## โœ… DATABASE CHECKLIST + +```markdown +### Database Implementation Verification +- [ ] Uses SQLx (not rusqlite/tokio-postgres) +- [ ] Entities derive `FromRow`, `Serialize`, `Deserialize` +- [ ] All serde structs use `#[serde(rename_all = "camelCase")]` +- [ ] Uses `sqlx::query_as` instead of `sqlx::query!` macro +- [ ] Repository pattern with async traits +- [ ] Comprehensive unit tests with sqlx-db-tester +- [ ] Migration files in `migrations/` directory +- [ ] Connection pool properly configured +- [ ] Proper error handling (no unwrap/expect) +- [ ] All database operations are async +- [ ] Tests cover CRUD operations +- [ ] Indexes defined for query performance +``` + +This database standard ensures type-safe, performant, and testable database access patterns across Rust applications. diff --git a/.cursor/rules/rust/features/tools-and-config.mdc b/.cursor/rules/rust/features/tools-and-config.mdc new file mode 100644 index 0000000..8f2a6e9 --- /dev/null +++ b/.cursor/rules/rust/features/tools-and-config.mdc @@ -0,0 +1,583 @@ +--- +description: +globs: +alwaysApply: false +--- +# ๐Ÿ› ๏ธ RUST TOOLS & CONFIGURATION BEST PRACTICES + +> **TL;DR:** Modern tooling choices and configuration patterns for Rust applications, focusing on maintainable and production-ready setups. + +## ๐Ÿ“Š LOGGING AND OBSERVABILITY + +### Use Tracing Instead of env_logger +```rust +// โœ… Preferred: Use tracing ecosystem +use tracing::{info, warn, error, debug, instrument}; +use tracing_subscriber::{ + layer::SubscriberExt, + util::SubscriberInitExt, + EnvFilter, + Registry, +}; + +pub fn init_tracing() { + Registry::default() + .with( + tracing_subscriber::fmt::layer() + .with_target(false) + .compact() + ) + .with(EnvFilter::from_default_env()) + .init(); +} + +// Structured logging with context +#[instrument(skip(sensitive_data))] +pub async fn process_workflow( + workflow_id: &str, + user_id: &str, + sensitive_data: &[u8] +) -> Result<(), ProcessingError> { + info!("Starting workflow processing"); + + // Processing logic + debug!("Processing step 1 completed"); + + match perform_operation().await { + Ok(result) => { + info!(result_count = result.len(), "Processing completed successfully"); + Ok(()) + } + Err(e) => { + error!(error = %e, "Processing failed"); + Err(e) + } + } +} + +// โŒ Avoid: env_logger (deprecated pattern) +// use env_logger; +// env_logger::init(); +``` + +### Tracing Configuration +```rust +use tracing_appender::rolling::{RollingFileAppender, Rotation}; +use tracing_subscriber::{ + fmt::writer::MakeWriterExt, + layer::SubscriberExt, + util::SubscriberInitExt, +}; + +pub fn init_production_tracing(log_dir: &str) -> Result<(), Box> { + // File appender with rotation + let file_appender = RollingFileAppender::new( + Rotation::daily(), + log_dir, + "application.log" + ); + + // Console output for development + let console_layer = tracing_subscriber::fmt::layer() + .with_target(false) + .compact(); + + // File output for production + let file_layer = tracing_subscriber::fmt::layer() + .with_writer(file_appender) + .with_ansi(false) + .json(); + + tracing_subscriber::registry() + .with(console_layer) + .with(file_layer) + .with(EnvFilter::from_default_env()) + .init(); + + Ok(()) +} +``` + +## ๐Ÿ“„ CONFIGURATION MANAGEMENT + +### Use YAML Instead of TOML +```yaml +# config.yaml - Preferred configuration format +server: + host: "0.0.0.0" + port: 8080 + workers: 4 + +database: + url: "postgresql://user:pass@localhost/db" + maxConnections: 20 + minConnections: 5 + timeoutSecs: 30 + +logging: + level: "info" + format: "json" + directory: "./logs" + +features: + enableMetrics: true + enableTracing: true + debugMode: false + +nodes: + ai: + defaultModel: "gpt-4o" + maxTokens: 4096 + temperature: 0.7 + + crawler: + userAgent: "CellaBot/1.0" + timeout: 30 + maxRetries: 3 +``` + +### Configuration Loading Pattern +```rust +use serde::{Deserialize, Serialize}; +use std::fs; +use anyhow::{Context, Result}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AppConfig { + pub server: ServerConfig, + pub database: DatabaseConfig, + pub logging: LoggingConfig, + pub features: FeatureConfig, + pub nodes: NodeConfig, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ServerConfig { + pub host: String, + pub port: u16, + pub workers: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct DatabaseConfig { + pub url: String, + pub max_connections: u32, + pub min_connections: u32, + pub timeout_secs: u64, +} + +impl AppConfig { + pub fn load() -> Result { + // Try multiple config sources in order + Self::from_file("config.yaml") + .or_else(|_| Self::from_file("config.yml")) + .or_else(|_| Self::from_env()) + .context("Failed to load configuration from any source") + } + + pub fn from_file(path: &str) -> Result { + let content = fs::read_to_string(path) + .with_context(|| format!("Failed to read config file: {}", path))?; + + serde_yaml::from_str(&content) + .with_context(|| format!("Failed to parse config file: {}", path)) + } + + pub fn from_env() -> Result { + // Load from environment variables with fallbacks + Ok(Self { + server: ServerConfig { + host: std::env::var("SERVER_HOST") + .unwrap_or_else(|_| "0.0.0.0".to_string()), + port: std::env::var("SERVER_PORT") + .unwrap_or_else(|_| "8080".to_string()) + .parse() + .context("Invalid SERVER_PORT")?, + workers: std::env::var("SERVER_WORKERS") + .unwrap_or_else(|_| "4".to_string()) + .parse() + .context("Invalid SERVER_WORKERS")?, + }, + database: DatabaseConfig { + url: std::env::var("DATABASE_URL") + .context("DATABASE_URL environment variable required")?, + max_connections: std::env::var("DB_MAX_CONNECTIONS") + .unwrap_or_else(|_| "20".to_string()) + .parse() + .context("Invalid DB_MAX_CONNECTIONS")?, + min_connections: std::env::var("DB_MIN_CONNECTIONS") + .unwrap_or_else(|_| "5".to_string()) + .parse() + .context("Invalid DB_MIN_CONNECTIONS")?, + timeout_secs: std::env::var("DB_TIMEOUT_SECS") + .unwrap_or_else(|_| "30".to_string()) + .parse() + .context("Invalid DB_TIMEOUT_SECS")?, + }, + // ... other config sections + }) + } + + pub fn validate(&self) -> Result<()> { + if self.server.port == 0 { + anyhow::bail!("Server port cannot be 0"); + } + + if self.database.url.is_empty() { + anyhow::bail!("Database URL cannot be empty"); + } + + if self.server.workers == 0 { + anyhow::bail!("Server workers must be greater than 0"); + } + + Ok(()) + } +} + +impl Default for AppConfig { + fn default() -> Self { + Self { + server: ServerConfig { + host: "127.0.0.1".to_string(), + port: 8080, + workers: 4, + }, + database: DatabaseConfig { + url: "sqlite::memory:".to_string(), + max_connections: 20, + min_connections: 5, + timeout_secs: 30, + }, + // ... other default values + } + } +} +``` + +## ๐Ÿ”ง TEMPLATING WITH MINIJINJA + +### Use MiniJinja Instead of Handlebars +```rust +// โœ… Preferred: Use minijinja for templating +use minijinja::{Environment, context}; +use serde_json::Value; +use anyhow::Result; + +pub struct TemplateEngine { + env: Environment<'static>, +} + +impl TemplateEngine { + pub fn new() -> Self { + let mut env = Environment::new(); + + // Add custom filters + env.add_filter("format_date", format_date_filter); + env.add_filter("truncate", truncate_filter); + env.add_filter("json_path", json_path_filter); + + // Add custom functions + env.add_function("now", now_function); + env.add_function("uuid", uuid_function); + + Self { env } + } + + pub fn render_template(&self, template: &str, data: &Value) -> Result { + let tmpl = self.env.template_from_str(template)?; + let result = tmpl.render(context! { data => data })?; + Ok(result) + } + + pub fn add_template(&mut self, name: &str, source: &str) -> Result<()> { + self.env.add_template(name, source)?; + Ok(()) + } + + pub fn render_named(&self, name: &str, data: &Value) -> Result { + let tmpl = self.env.get_template(name)?; + let result = tmpl.render(context! { data => data })?; + Ok(result) + } +} + +// Custom filters +fn format_date_filter(value: Value, format: Option) -> Result { + // Implementation for date formatting + todo!() +} + +fn truncate_filter(value: Value, length: Option) -> Result { + let text = value.as_str().unwrap_or(""); + let len = length.unwrap_or(100); + + if text.len() <= len { + Ok(text.to_string()) + } else { + Ok(format!("{}...", &text[..len])) + } +} + +fn json_path_filter(value: Value, path: String) -> Result { + // Use jsonpath-rust for extraction + use jsonpath_rust::JsonPathFinder; + + let finder = JsonPathFinder::from_str(&value.to_string(), &path) + .map_err(|e| minijinja::Error::new(minijinja::ErrorKind::InvalidOperation, e.to_string()))?; + + let result = finder.find(); + Ok(serde_json::to_value(result).unwrap_or(Value::Null)) +} + +// Custom functions +fn now_function(_args: Vec) -> Result { + use chrono::Utc; + Ok(Value::String(Utc::now().to_rfc3339())) +} + +fn uuid_function(_args: Vec) -> Result { + use uuid::Uuid; + Ok(Value::String(Uuid::new_v4().to_string())) +} + +// โŒ Avoid: handlebars (less modern) +// use handlebars::Handlebars; +``` + +### Template Usage Examples +```rust +use serde_json::json; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_template_rendering() { + let engine = TemplateEngine::new(); + + let template = r#" + Hello {{ data.name }}! + + Your workflow "{{ data.workflow_name }}" has {{ data.node_count }} nodes. + + Results: + {% for result in data.results %} + - {{ result.node_name }}: {{ result.status }} + {% endfor %} + + Generated at: {{ now() }} + Request ID: {{ uuid() }} + "#; + + let data = json!({ + "name": "John Doe", + "workflow_name": "HackerNews Summary", + "node_count": 3, + "results": [ + {"node_name": "RSS Feed", "status": "success"}, + {"node_name": "Content Fetch", "status": "success"}, + {"node_name": "AI Summary", "status": "completed"} + ] + }); + + let result = engine.render_template(template, &data).unwrap(); + assert!(result.contains("Hello John Doe!")); + assert!(result.contains("HackerNews Summary")); + } + + #[test] + fn test_json_path_filter() { + let engine = TemplateEngine::new(); + + let template = r#" + Title: {{ data | json_path("$.title") }} + First item: {{ data | json_path("$.items[0].name") }} + "#; + + let data = json!({ + "title": "My Workflow", + "items": [ + {"name": "First Item", "value": 1}, + {"name": "Second Item", "value": 2} + ] + }); + + let result = engine.render_template(template, &data).unwrap(); + assert!(result.contains("Title: My Workflow")); + assert!(result.contains("First item: First Item")); + } +} +``` + +## ๐Ÿ” DATA TRANSFORMATION WITH JSONPATH + +### JsonPath Integration +```rust +use jsonpath_rust::{JsonPathFinder, JsonPathQuery}; +use serde_json::Value; +use anyhow::{Result, Context}; + +pub struct DataTransformer { + template_engine: TemplateEngine, +} + +impl DataTransformer { + pub fn new() -> Self { + Self { + template_engine: TemplateEngine::new(), + } + } + + /// Extract data using JSONPath + pub fn extract_json_path(&self, data: &Value, path: &str) -> Result { + let path_query = JsonPathQuery::from(path); + let result = data.path(&path_query) + .context("Failed to execute JSONPath query")?; + + Ok(serde_json::to_value(result)?) + } + + /// Transform data using both JSONPath extraction and template rendering + pub fn transform(&self, input: &Value, config: &TransformConfig) -> Result { + match config { + TransformConfig::JsonPath { path } => { + self.extract_json_path(input, path) + } + TransformConfig::Template { template } => { + let rendered = self.template_engine.render_template(template, input)?; + Ok(Value::String(rendered)) + } + TransformConfig::Composite { extractions, template } => { + let mut extracted_data = serde_json::Map::new(); + + // Extract data using JSONPath + for (key, path) in extractions { + let value = self.extract_json_path(input, path)?; + extracted_data.insert(key.clone(), value); + } + + // Render template with extracted data + let data = Value::Object(extracted_data); + let rendered = self.template_engine.render_template(template, &data)?; + Ok(Value::String(rendered)) + } + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase", tag = "type")] +pub enum TransformConfig { + JsonPath { + path: String, + }, + Template { + template: String, + }, + Composite { + extractions: std::collections::HashMap, + template: String, + }, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_json_path_extraction() { + let transformer = DataTransformer::new(); + + let data = json!({ + "rss": { + "items": [ + {"title": "Article 1", "url": "http://example.com/1"}, + {"title": "Article 2", "url": "http://example.com/2"} + ] + } + }); + + // Extract URLs + let urls = transformer.extract_json_path(&data, "$.rss.items[*].url").unwrap(); + assert_eq!(urls, json!(["http://example.com/1", "http://example.com/2"])); + } + + #[test] + fn test_composite_transformation() { + let transformer = DataTransformer::new(); + + let data = json!({ + "articles": [ + {"title": "First", "content": "Content 1"}, + {"title": "Second", "content": "Content 2"} + ] + }); + + let config = TransformConfig::Composite { + extractions: [ + ("titles".to_string(), "$.articles[*].title".to_string()), + ("count".to_string(), "$.articles.length()".to_string()), + ].into_iter().collect(), + template: "Found {{ count }} articles: {{ titles | join(', ') }}".to_string(), + }; + + let result = transformer.transform(&data, &config).unwrap(); + let expected = "Found 2 articles: First, Second"; + assert_eq!(result, Value::String(expected.to_string())); + } +} +``` + +## ๐Ÿšจ CONFIGURATION ANTI-PATTERNS + +### What to Avoid +```rust +// โŒ Don't use TOML for complex configurations +// [server] +// host = "0.0.0.0" +// port = 8080 +// +// [database] +// url = "postgresql://..." +// # TOML becomes unwieldy for nested structures + +// โŒ Don't use env_logger +// use env_logger; +// env_logger::init(); // Use tracing instead + +// โŒ Don't use handlebars for new projects +// use handlebars::Handlebars; // Use minijinja instead + +// โŒ Don't hardcode configuration values +// let database_url = "postgresql://localhost/mydb"; // Use config files/env vars + +// โŒ Don't ignore configuration validation +// pub fn load_config() -> Config { +// serde_yaml::from_str(&content).unwrap() // Add proper validation +// } +``` + +## โœ… TOOLS & CONFIGURATION CHECKLIST + +```markdown +### Tools & Configuration Verification +- [ ] Uses tracing instead of env_logger +- [ ] Configuration in YAML format (not TOML) +- [ ] All config structs use #[serde(rename_all = "camelCase")] +- [ ] Configuration validation implemented +- [ ] Environment variable fallbacks provided +- [ ] MiniJinja used for templating (not handlebars) +- [ ] JSONPath integration for data extraction +- [ ] Custom filters and functions in templates +- [ ] Structured logging with context +- [ ] File rotation for production logging +- [ ] Configuration loading from multiple sources +- [ ] Default values provided for all config options +``` + +This tools and configuration standard ensures modern, maintainable, and production-ready Rust applications with proper observability and flexible configuration management.