feature: use instructions for rules

This commit is contained in:
Tyr Chen
2025-05-31 11:22:33 -07:00
parent 8801d986c4
commit d7d87f3445
5 changed files with 2398 additions and 0 deletions

View File

@@ -3,3 +3,264 @@ description:
globs:
alwaysApply: false
---
# 🦀 RUST CORE CODE QUALITY STANDARDS
> **TL;DR:** Essential code quality rules for all Rust projects, focusing on maintainable, production-ready code that follows modern Rust 2024 idioms.
## 🎯 FUNDAMENTAL PRINCIPLES
### Code Organization
- **Functionality-based files**: Use meaningful file names like `node.rs`, `workflow.rs`, `execution.rs` instead of generic `models.rs`, `traits.rs`, `types.rs`
- **Meaningful naming**: Avoid names like `WorkflowValidatorImpl` - use descriptive, specific names
- **File size limits**: Maximum 500 lines per file (excluding tests)
- **Function size**: Maximum 150 lines per function
- **Single Responsibility**: Each module should have one clear purpose
### Rust Edition and Safety
- **Always use Rust 2024 edition**
- **Never use `unsafe` code** - find safe alternatives
- **Production-ready code**: All code must be deployable and maintainable
- **No `unwrap()` or `expect()`** in production code - use proper error handling
## 📦 DEPENDENCY MANAGEMENT
### Workspace Dependencies Priority
```toml
# Always prefer workspace dependencies first
[dependencies]
tokio = { workspace = true }
serde = { workspace = true, features = ["derive"] }
# Only add new dependencies if not available in workspace
# Request permission before modifying Cargo.toml
```
### Version Strategy
- **Always use latest versions** when adding new dependencies
- **Request permission** before modifying `Cargo.toml`
- **Check workspace first** - never duplicate dependencies unnecessarily
## 🏗️ CODE STRUCTURE PATTERNS
### Data Structure Organization
```rust
// ✅ Good: Functionality-based organization
// src/workflow.rs - All workflow-related types and logic
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] // Always use camelCase for JSON serialization
pub struct WorkflowDefinition {
pub workflow_id: String,
pub display_name: String,
pub created_at: DateTime<Utc>,
}
// ✅ Good: Meaningful trait names
pub trait WorkflowValidator {
fn validate(&self, workflow: &WorkflowDefinition) -> Result<(), ValidationError>;
}
// ❌ Bad: Generic file organization
// src/models.rs, src/traits.rs, src/types.rs
// ❌ Bad: Poor naming
// struct WorkflowValidatorImpl
```
### Serde Configuration
```rust
// ✅ Always use camelCase for JSON serialization
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ApiResponse {
pub workflow_id: String,
pub created_at: DateTime<Utc>,
pub is_active: bool,
}
// This serializes to:
// {"workflowId": "...", "createdAt": "...", "isActive": true}
```
## 🔧 BUILD AND QUALITY CHECKS
### Mandatory Verification Steps
After completing any code changes, **always run in order**:
```bash
# 1. Build check
cargo build
# 2. Test execution
cargo test
# 3. Linting
cargo clippy
# All must pass before considering code complete
```
### Clippy Configuration
```toml
# Cargo.toml
[lints.clippy]
all = "warn"
pedantic = "warn"
nursery = "warn"
unwrap_used = "deny"
expect_used = "deny"
```
## 🗂️ FILE NAMING CONVENTIONS
### Module Organization Patterns
```rust
// ✅ Good: Feature-based modules
src/
├── workflow/
│ ├── mod.rs
│ ├── validator.rs // WorkflowValidator trait and implementations
│ ├── executor.rs // WorkflowExecutor logic
│ └── definition.rs // WorkflowDefinition types
├── node/
│ ├── mod.rs
│ ├── registry.rs // NodeRegistry (not NodeTypeRegistry)
│ └── executor.rs // Node execution logic
└── storage/
├── mod.rs
├── entities.rs // Database entities
└── repositories.rs // Data access patterns
```
### Naming Best Practices
```rust
// ✅ Good naming examples
pub struct WorkflowValidator; // Clear, specific
pub struct NodeExecutor; // Action-oriented
pub struct DatabaseConnection; // Descriptive
// ❌ Bad naming examples
pub struct WorkflowValidatorImpl; // Unnecessary "Impl" suffix
pub struct Helper; // Too generic
pub struct Manager; // Vague responsibility
```
## 🧪 TESTING STANDARDS
### Unit Test Placement
```rust
// ✅ Always place unit tests in the same file
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_workflow_validation() {
let validator = WorkflowValidator::new();
let workflow = WorkflowDefinition::default();
assert!(validator.validate(&workflow).is_ok());
}
}
// ❌ Don't create separate test files for unit tests
// tests/workflow_test.rs (this is for integration tests only)
```
### Test Naming
```rust
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_valid_workflow_passes_validation() {
// Test name clearly describes the scenario
}
#[test]
fn test_empty_workflow_id_returns_error() {
// Specific about what's being tested
}
}
```
## 📝 DOCUMENTATION STANDARDS
### Code Documentation
```rust
/// Validates workflow definitions according to business rules.
///
/// # Examples
///
/// ```rust
/// let validator = WorkflowValidator::new();
/// let workflow = WorkflowDefinition::builder()
/// .workflow_id("test-workflow")
/// .build();
///
/// assert!(validator.validate(&workflow).is_ok());
/// ```
///
/// # Errors
///
/// Returns `ValidationError` if:
/// - Workflow ID is empty or invalid
/// - Required fields are missing
/// - Business rules are violated
pub struct WorkflowValidator {
rules: Vec<ValidationRule>,
}
```
## 🚨 ANTI-PATTERNS TO AVOID
### Code Organization Anti-Patterns
```rust
// ❌ Don't use generic file names
// src/models.rs - mixing unrelated types
// src/utils.rs - catch-all for random functions
// src/helpers.rs - unclear responsibility
// ❌ Don't use implementation suffixes
pub struct WorkflowValidatorImpl;
pub struct DatabaseManagerImpl;
// ❌ Don't mix concerns in single files
// src/app.rs containing database, validation, and HTTP logic
// ❌ Don't use overly long files
// Any file > 500 lines (excluding tests) needs refactoring
```
### Dependency Anti-Patterns
```rust
// ❌ Don't duplicate workspace dependencies
[dependencies]
tokio = "1.0" # Already in workspace
// ❌ Don't modify Cargo.toml without permission
# Always ask before adding new dependencies
// ❌ Don't use outdated versions
serde = "0.9" # Use latest stable
```
## ✅ QUALITY CHECKLIST
```markdown
### Code Quality Verification
- [ ] Uses Rust 2024 edition
- [ ] No `unsafe` code blocks
- [ ] No `unwrap()` or `expect()` in production code
- [ ] All data structures use `#[serde(rename_all = "camelCase")]`
- [ ] Files organized by functionality, not type
- [ ] Meaningful names (no "Impl" suffixes)
- [ ] Functions ≤ 150 lines
- [ ] Files ≤ 500 lines (excluding tests)
- [ ] Unit tests in same file as implementation
- [ ] `cargo build` passes
- [ ] `cargo test` passes
- [ ] `cargo clippy` passes with no warnings
- [ ] Public APIs documented with examples
- [ ] Workspace dependencies used when available
```
This code quality standard ensures consistent, maintainable, and production-ready Rust code across all projects.

View File

@@ -3,3 +3,604 @@ description:
globs:
alwaysApply: false
---
# 🌐 AXUM WEB FRAMEWORK BEST PRACTICES
> **TL;DR:** Modern async web development with Axum, focusing on structured APIs, proper error handling, and OpenAPI documentation.
## 🎯 AXUM PROJECT STRUCTURE
### Router Organization
```rust
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::Json,
routing::{get, post, put, delete},
Router,
};
use utoipa::{OpenApi, ToSchema};
use utoipa_axum::{router::OpenApiRouter, routes};
// ✅ Good: Modular router structure
pub fn create_app(state: AppState) -> Router {
let (router, api) = OpenApiRouter::with_openapi(ApiDoc::openapi())
.routes(routes!(health_check))
.nest("/api/v1/workflows", workflows::router())
.nest("/api/v1/nodes", nodes::router())
.nest("/api/v1/executions", executions::router())
.with_state(state)
.split_for_parts();
router.merge(utoipa_swagger_ui::SwaggerUi::new("/swagger-ui")
.url("/apidoc/openapi.json", api))
}
#[derive(OpenApi)]
#[openapi(
tags(
(name = "workflows", description = "Workflow management API"),
(name = "nodes", description = "Node management API"),
(name = "executions", description = "Execution management API")
)
)]
struct ApiDoc;
```
### Application State Pattern
```rust
use sqlx::PgPool;
use std::sync::Arc;
#[derive(Clone)]
pub struct AppState {
pub db: PgPool,
pub config: Arc<AppConfig>,
pub node_registry: Arc<NodeRegistry>,
pub event_bus: Arc<EventBus>,
}
impl AppState {
pub async fn new(config: AppConfig) -> Result<Self, AppError> {
let db = create_connection_pool(&config.database.url).await?;
// Run migrations
sqlx::migrate!("./migrations").run(&db).await?;
let node_registry = Arc::new(NodeRegistry::new());
let event_bus = Arc::new(EventBus::new(1000));
Ok(Self {
db,
config: Arc::new(config),
node_registry,
event_bus,
})
}
}
```
## 🔧 REQUEST/RESPONSE PATTERNS
### Path Parameters with Validation
```rust
use axum::extract::Path;
use serde::Deserialize;
use uuid::Uuid;
use utoipa::ToSchema;
// ✅ Good: Axum 0.8 path parameter syntax
#[utoipa::path(
get,
path = "/api/v1/workflows/{workflow_id}",
params(
("workflow_id" = Uuid, Path, description = "Workflow unique identifier")
),
responses(
(status = 200, description = "Workflow found", body = WorkflowResponse),
(status = 404, description = "Workflow not found", body = ErrorResponse)
),
tag = "workflows"
)]
pub async fn get_workflow(
State(state): State<AppState>,
Path(workflow_id): Path<Uuid>, // Note: {workflow_id} in route, not :workflow_id
) -> Result<Json<WorkflowResponse>, AppError> {
let workflow = state
.workflow_service
.get_by_id(workflow_id)
.await?
.ok_or(AppError::NotFound("Workflow not found".to_string()))?;
Ok(Json(WorkflowResponse::from(workflow)))
}
// ❌ Wrong: Axum 0.7 syntax (outdated)
// Path("/api/v1/workflows/:workflow_id") // Don't use :workflow_id
```
### Query Parameters with Defaults
```rust
use serde::Deserialize;
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ListQuery {
#[serde(default = "default_limit")]
pub limit: i64,
#[serde(default)]
pub offset: i64,
#[serde(default)]
pub sort_by: Option<String>,
#[serde(default)]
pub sort_order: Option<SortOrder>,
}
fn default_limit() -> i64 {
20
}
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "lowercase")]
pub enum SortOrder {
Asc,
Desc,
}
#[utoipa::path(
get,
path = "/api/v1/workflows",
params(ListQuery),
responses(
(status = 200, description = "List of workflows", body = WorkflowListResponse)
),
tag = "workflows"
)]
pub async fn list_workflows(
State(state): State<AppState>,
Query(query): Query<ListQuery>,
) -> Result<Json<WorkflowListResponse>, AppError> {
let workflows = state
.workflow_service
.list(query.limit, query.offset, query.sort_by, query.sort_order)
.await?;
Ok(Json(WorkflowListResponse { workflows }))
}
```
### JSON Request/Response with Validation
```rust
use axum::Json;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use validator::Validate;
#[derive(Debug, Deserialize, Validate, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateWorkflowRequest {
#[validate(length(min = 1, max = 100))]
pub name: String,
#[validate(length(max = 500))]
pub description: Option<String>,
#[serde(default)]
pub workflow_data: serde_json::Value,
#[serde(default)]
pub is_active: bool,
}
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct WorkflowResponse {
pub id: Uuid,
pub name: String,
pub description: Option<String>,
pub workflow_data: serde_json::Value,
pub is_active: bool,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[utoipa::path(
post,
path = "/api/v1/workflows",
request_body = CreateWorkflowRequest,
responses(
(status = 201, description = "Workflow created", body = WorkflowResponse),
(status = 400, description = "Invalid request", body = ErrorResponse),
(status = 422, description = "Validation error", body = ValidationErrorResponse)
),
tag = "workflows"
)]
pub async fn create_workflow(
State(state): State<AppState>,
Json(request): Json<CreateWorkflowRequest>,
) -> Result<(StatusCode, Json<WorkflowResponse>), AppError> {
// Validate request
request.validate()
.map_err(|e| AppError::Validation(e.to_string()))?;
let workflow = state
.workflow_service
.create(request)
.await?;
Ok((StatusCode::CREATED, Json(WorkflowResponse::from(workflow))))
}
```
## 🚨 ERROR HANDLING
### Centralized Error Types
```rust
use axum::{
http::StatusCode,
response::{IntoResponse, Response},
Json,
};
use serde::Serialize;
use thiserror::Error;
use utoipa::ToSchema;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Validation error: {0}")]
Validation(String),
#[error("Not found: {0}")]
NotFound(String),
#[error("Unauthorized")]
Unauthorized,
#[error("Forbidden")]
Forbidden,
#[error("Internal server error: {0}")]
Internal(String),
#[error("Bad request: {0}")]
BadRequest(String),
}
#[derive(Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ErrorResponse {
pub error: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub details: Option<serde_json::Value>,
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, error_message) = match &self {
AppError::Database(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Database error"),
AppError::Validation(_) => (StatusCode::UNPROCESSABLE_ENTITY, "Validation error"),
AppError::NotFound(_) => (StatusCode::NOT_FOUND, "Not found"),
AppError::Unauthorized => (StatusCode::UNAUTHORIZED, "Unauthorized"),
AppError::Forbidden => (StatusCode::FORBIDDEN, "Forbidden"),
AppError::Internal(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Internal server error"),
AppError::BadRequest(_) => (StatusCode::BAD_REQUEST, "Bad request"),
};
let error_response = ErrorResponse {
error: error_message.to_string(),
message: self.to_string(),
details: None,
};
(status, Json(error_response)).into_response()
}
}
```
### Request Validation Middleware
```rust
use axum::{
extract::Request,
middleware::Next,
response::Response,
};
use tower::ServiceBuilder;
pub fn create_middleware_stack() -> ServiceBuilder<
tower::ServiceBuilder<
tower::ServiceBuilder<
tower::ServiceBuilder<tower::identity::Identity>
>
>
> {
ServiceBuilder::new()
.layer(tower_http::cors::CorsLayer::permissive())
.layer(tower_http::trace::TraceLayer::new_for_http())
.layer(axum::middleware::from_fn(request_id_middleware))
.layer(tower_http::timeout::TimeoutLayer::new(
std::time::Duration::from_secs(30)
))
}
pub async fn request_id_middleware(
mut request: Request,
next: Next,
) -> Result<Response, StatusCode> {
let request_id = uuid::Uuid::new_v4().to_string();
request.headers_mut().insert(
"X-Request-ID",
request_id.parse().unwrap(),
);
Ok(next.run(request).await)
}
```
## 📚 OPENAPI DOCUMENTATION
### Complete API Documentation
```rust
use utoipa::{OpenApi, ToSchema};
use utoipa_axum::router::OpenApiRouter;
#[derive(OpenApi)]
#[openapi(
info(
title = "Workflow Engine API",
version = "1.0.0",
description = "A comprehensive workflow management system",
contact(
name = "API Support",
email = "support@example.com"
)
),
paths(
// Workflow endpoints
workflows::get_workflow,
workflows::list_workflows,
workflows::create_workflow,
workflows::update_workflow,
workflows::delete_workflow,
workflows::execute_workflow,
// Node endpoints
nodes::list_node_types,
nodes::get_node_schema,
// Execution endpoints
executions::get_execution,
executions::list_executions,
),
components(
schemas(
WorkflowResponse,
CreateWorkflowRequest,
UpdateWorkflowRequest,
WorkflowListResponse,
NodeTypeInfo,
ExecutionResponse,
ErrorResponse,
)
),
tags(
(name = "workflows", description = "Workflow management operations"),
(name = "nodes", description = "Node type information and schemas"),
(name = "executions", description = "Workflow execution tracking")
)
)]
pub struct ApiDoc;
// ✅ Good: Detailed endpoint documentation
#[utoipa::path(
put,
path = "/api/v1/workflows/{workflow_id}",
params(
("workflow_id" = Uuid, Path, description = "Workflow unique identifier")
),
request_body(
content = UpdateWorkflowRequest,
description = "Workflow update data",
content_type = "application/json"
),
responses(
(status = 200, description = "Workflow updated successfully", body = WorkflowResponse),
(status = 400, description = "Invalid request data", body = ErrorResponse),
(status = 404, description = "Workflow not found", body = ErrorResponse),
(status = 422, description = "Validation failed", body = ErrorResponse)
),
tag = "workflows",
summary = "Update workflow",
description = "Updates an existing workflow with new data. Only provided fields will be updated."
)]
pub async fn update_workflow(
State(state): State<AppState>,
Path(workflow_id): Path<Uuid>,
Json(request): Json<UpdateWorkflowRequest>,
) -> Result<Json<WorkflowResponse>, AppError> {
// Implementation
todo!()
}
```
## 🧪 TESTING AXUM HANDLERS
### Integration Testing
```rust
#[cfg(test)]
mod tests {
use super::*;
use axum::{
body::Body,
http::{Request, StatusCode},
};
use serde_json::json;
use tower::ServiceExt;
async fn setup_test_app() -> Router {
let state = AppState::new_test().await;
create_app(state)
}
#[tokio::test]
async fn test_create_workflow_success() {
let app = setup_test_app().await;
let request_body = json!({
"name": "Test Workflow",
"description": "A test workflow",
"workflowData": {},
"isActive": true
});
let request = Request::builder()
.method("POST")
.uri("/api/v1/workflows")
.header("Content-Type", "application/json")
.body(Body::from(request_body.to_string()))
.unwrap();
let response = app.oneshot(request).await.unwrap();
assert_eq!(response.status(), StatusCode::CREATED);
let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
let workflow: WorkflowResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(workflow.name, "Test Workflow");
}
#[tokio::test]
async fn test_get_workflow_not_found() {
let app = setup_test_app().await;
let non_existent_id = Uuid::new_v4();
let request = Request::builder()
.method("GET")
.uri(&format!("/api/v1/workflows/{}", non_existent_id))
.body(Body::empty())
.unwrap();
let response = app.oneshot(request).await.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn test_validation_error() {
let app = setup_test_app().await;
let invalid_request = json!({
"name": "", // Empty name should fail validation
"description": "Valid description"
});
let request = Request::builder()
.method("POST")
.uri("/api/v1/workflows")
.header("Content-Type", "application/json")
.body(Body::from(invalid_request.to_string()))
.unwrap();
let response = app.oneshot(request).await.unwrap();
assert_eq!(response.status(), StatusCode::UNPROCESSABLE_ENTITY);
}
}
```
## 🔧 SERVER CONFIGURATION
### Production Server Setup
```rust
use axum::serve;
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tower_http::trace::TraceLayer;
use tracing::{info, error};
pub async fn start_server(config: AppConfig) -> Result<(), Box<dyn std::error::Error>> {
let state = AppState::new(config.clone()).await?;
let app = create_app(state)
.layer(create_middleware_stack());
let addr = SocketAddr::from(([0, 0, 0, 0], config.server.port));
let listener = TcpListener::bind(addr).await?;
info!("Server starting on {}", addr);
serve(listener, app)
.await
.map_err(|e| {
error!("Server error: {}", e);
e.into()
})
}
// Graceful shutdown handling
pub async fn start_server_with_shutdown(
config: AppConfig,
shutdown_signal: impl std::future::Future<Output = ()> + Send + 'static,
) -> Result<(), Box<dyn std::error::Error>> {
let state = AppState::new(config.clone()).await?;
let app = create_app(state)
.layer(create_middleware_stack());
let addr = SocketAddr::from(([0, 0, 0, 0], config.server.port));
let listener = TcpListener::bind(addr).await?;
info!("Server starting on {}", addr);
serve(listener, app)
.with_graceful_shutdown(shutdown_signal)
.await
.map_err(|e| {
error!("Server error: {}", e);
e.into()
})
}
```
## 🚨 AXUM ANTI-PATTERNS
### What to Avoid
```rust
// ❌ Don't use old Axum 0.7 path syntax
// #[utoipa::path(get, path = "/workflows/:id")] // Use {id} instead
// ❌ Don't forget Content-Type header for JSON endpoints
// Missing: .header("Content-Type", "application/json")
// ❌ Don't ignore validation
// pub async fn create_user(Json(request): Json<CreateUserRequest>) {
// // Missing: request.validate()?
// }
// ❌ Don't use std::sync in Axum handlers
// pub async fn bad_handler(State(state): State<Arc<Mutex<Data>>>) {
// let data = state.lock().unwrap(); // Blocks async runtime
// }
// ❌ Don't return bare strings for JSON APIs
// pub async fn bad_endpoint() -> String {
// "success".to_string() // Should return Json<Response>
// }
```
## ✅ AXUM CHECKLIST
```markdown
### Axum Implementation Verification
- [ ] Uses Axum 0.8 path syntax with {param} not :param
- [ ] All JSON structs use #[serde(rename_all = "camelCase")]
- [ ] Comprehensive OpenAPI documentation with utoipa
- [ ] Proper error handling with IntoResponse trait
- [ ] Request validation with validator crate
- [ ] Structured application state with Clone trait
- [ ] Integration tests for all endpoints
- [ ] Middleware stack includes CORS, tracing, timeouts
- [ ] Graceful shutdown implementation
- [ ] No blocking operations in async handlers
- [ ] Content-Type headers properly set
- [ ] HTTP status codes correctly used
```
This Axum standard ensures robust, well-documented, and maintainable web APIs following modern Rust async patterns.

View File

@@ -0,0 +1,525 @@
---
description:
globs:
alwaysApply: false
---
# ⚡ RUST CONCURRENCY BEST PRACTICES
> **TL;DR:** Modern async/await patterns and thread-safe data structures for high-performance Rust applications.
## 🎯 ASYNC RUNTIME SELECTION
### Tokio as the Standard
- **Always use Tokio** for async runtime
- **Use tokio::sync primitives** instead of std::sync for async code
- **Leverage async/await patterns** throughout the application
- **Avoid blocking operations** in async contexts
```toml
# Cargo.toml - Tokio configuration
[dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync", "time", "fs"] }
```
## 🔒 SYNCHRONIZATION PRIMITIVES
### Tokio Sync Over Std Sync
```rust
// ✅ Preferred: Use tokio synchronization primitives
use tokio::sync::{RwLock, Mutex, broadcast, mpsc, oneshot};
use std::sync::Arc;
// ✅ Good: Async-friendly RwLock
pub struct WorkflowCache {
data: Arc<RwLock<HashMap<String, WorkflowDefinition>>>,
}
impl WorkflowCache {
pub fn new() -> Self {
Self {
data: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn get(&self, id: &str) -> Option<WorkflowDefinition> {
let data = self.data.read().await;
data.get(id).cloned()
}
pub async fn insert(&self, id: String, workflow: WorkflowDefinition) {
let mut data = self.data.write().await;
data.insert(id, workflow);
}
pub async fn remove(&self, id: &str) -> Option<WorkflowDefinition> {
let mut data = self.data.write().await;
data.remove(id)
}
}
// ❌ Avoid: std::sync in async contexts
// use std::sync::{RwLock, Mutex}; // Blocks async runtime
// use parking_lot::{RwLock, Mutex}; // Also blocking
```
### DashMap for Concurrent Collections
```rust
use dashmap::DashMap;
use std::sync::Arc;
// ✅ Preferred: DashMap for concurrent hash maps
pub struct NodeRegistry {
nodes: Arc<DashMap<String, Box<dyn NodeType>>>,
categories: Arc<DashMap<String, Vec<String>>>,
}
impl NodeRegistry {
pub fn new() -> Self {
Self {
nodes: Arc::new(DashMap::new()),
categories: Arc::new(DashMap::new()),
}
}
pub fn register_node(&self, id: String, node: Box<dyn NodeType>) {
let category = node.category().to_string();
// Insert the node
self.nodes.insert(id.clone(), node);
// Update category index
self.categories
.entry(category)
.or_insert_with(Vec::new)
.push(id);
}
pub fn get_node(&self, id: &str) -> Option<dashmap::mapref::one::Ref<String, Box<dyn NodeType>>> {
self.nodes.get(id)
}
pub fn list_by_category(&self, category: &str) -> Vec<String> {
self.categories
.get(category)
.map(|entry| entry.clone())
.unwrap_or_default()
}
pub fn list_all_nodes(&self) -> Vec<String> {
self.nodes.iter().map(|entry| entry.key().clone()).collect()
}
}
// ❌ Avoid: Mutex<HashMap> for concurrent access
// pub struct BadNodeRegistry {
// nodes: Arc<Mutex<HashMap<String, Box<dyn NodeType>>>>
// }
```
## 📡 CHANNEL PATTERNS
### Multi-Producer Single-Consumer (MPSC)
```rust
use tokio::sync::mpsc;
use tracing::{info, error};
pub struct EventProcessor {
sender: mpsc::UnboundedSender<WorkflowEvent>,
}
impl EventProcessor {
pub fn new() -> (Self, EventProcessorHandle) {
let (tx, rx) = mpsc::unbounded_channel();
let handle = EventProcessorHandle::new(rx);
let processor = Self { sender: tx };
(processor, handle)
}
pub fn send_event(&self, event: WorkflowEvent) -> Result<(), mpsc::error::SendError<WorkflowEvent>> {
self.sender.send(event)
}
}
pub struct EventProcessorHandle {
receiver: mpsc::UnboundedReceiver<WorkflowEvent>,
}
impl EventProcessorHandle {
fn new(receiver: mpsc::UnboundedReceiver<WorkflowEvent>) -> Self {
Self { receiver }
}
pub async fn run(mut self) {
while let Some(event) = self.receiver.recv().await {
if let Err(e) = self.process_event(event).await {
error!("Failed to process event: {}", e);
}
}
info!("Event processor stopped");
}
async fn process_event(&self, event: WorkflowEvent) -> Result<(), ProcessingError> {
match event {
WorkflowEvent::Started { workflow_id, .. } => {
info!("Workflow {} started", workflow_id);
// Process workflow start
}
WorkflowEvent::Completed { workflow_id, .. } => {
info!("Workflow {} completed", workflow_id);
// Process workflow completion
}
WorkflowEvent::Failed { workflow_id, error, .. } => {
error!("Workflow {} failed: {}", workflow_id, error);
// Process workflow failure
}
}
Ok(())
}
}
```
### Broadcast for Multiple Subscribers
```rust
use tokio::sync::broadcast;
pub struct EventBus {
sender: broadcast::Sender<SystemEvent>,
}
impl EventBus {
pub fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self { sender }
}
pub fn publish(&self, event: SystemEvent) -> Result<usize, broadcast::error::SendError<SystemEvent>> {
self.sender.send(event)
}
pub fn subscribe(&self) -> broadcast::Receiver<SystemEvent> {
self.sender.subscribe()
}
}
// Usage example
pub async fn start_event_monitoring(event_bus: Arc<EventBus>) {
let mut receiver = event_bus.subscribe();
tokio::spawn(async move {
while let Ok(event) = receiver.recv().await {
match event {
SystemEvent::NodeExecutionStarted { node_id, .. } => {
info!("Node {} started execution", node_id);
}
SystemEvent::NodeExecutionCompleted { node_id, .. } => {
info!("Node {} completed execution", node_id);
}
SystemEvent::SystemShutdown => {
info!("System shutdown requested");
break;
}
}
}
});
}
```
### Oneshot for Single Response
```rust
use tokio::sync::oneshot;
pub struct AsyncValidator {
// Internal state
}
impl AsyncValidator {
pub async fn validate_workflow(&self, workflow: WorkflowDefinition) -> Result<ValidationResult, ValidationError> {
let (tx, rx) = oneshot::channel();
// Spawn validation task
let workflow_clone = workflow.clone();
tokio::spawn(async move {
let result = perform_validation(workflow_clone).await;
let _ = tx.send(result);
});
// Wait for validation result
rx.await
.map_err(|_| ValidationError::Internal("Validation task cancelled".to_string()))?
}
}
async fn perform_validation(workflow: WorkflowDefinition) -> Result<ValidationResult, ValidationError> {
// Expensive validation logic
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
if workflow.nodes.is_empty() {
return Err(ValidationError::EmptyWorkflow);
}
Ok(ValidationResult::Valid)
}
```
## 🏃 TASK MANAGEMENT
### Structured Concurrency with JoinSet
```rust
use tokio::task::JoinSet;
use std::collections::HashMap;
pub struct WorkflowExecutor {
// Internal state
}
impl WorkflowExecutor {
pub async fn execute_workflow_parallel(&self, workflow: &WorkflowDefinition) -> Result<ExecutionResult, ExecutionError> {
let mut join_set = JoinSet::new();
let mut results = HashMap::new();
// Execute nodes in parallel where possible
for node in &workflow.nodes {
if self.can_execute_parallel(node, &results) {
let node_clone = node.clone();
let executor = self.clone();
join_set.spawn(async move {
let result = executor.execute_node(&node_clone).await;
(node_clone.id.clone(), result)
});
}
}
// Collect results
while let Some(result) = join_set.join_next().await {
match result {
Ok((node_id, execution_result)) => {
results.insert(node_id, execution_result?);
}
Err(join_error) => {
return Err(ExecutionError::TaskFailed(join_error.to_string()));
}
}
}
Ok(ExecutionResult { node_results: results })
}
fn can_execute_parallel(&self, node: &NodeDefinition, completed_results: &HashMap<String, NodeResult>) -> bool {
// Check if all dependencies are satisfied
node.dependencies.iter().all(|dep| completed_results.contains_key(dep))
}
}
```
### Graceful Shutdown Pattern
```rust
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
pub struct Application {
shutdown_token: CancellationToken,
tasks: Vec<tokio::task::JoinHandle<()>>,
}
impl Application {
pub fn new() -> Self {
Self {
shutdown_token: CancellationToken::new(),
tasks: Vec::new(),
}
}
pub async fn start(&mut self) -> Result<(), ApplicationError> {
// Start background services
self.start_workflow_executor().await?;
self.start_event_processor().await?;
self.start_health_monitor().await?;
// Wait for shutdown signal
self.wait_for_shutdown().await;
// Graceful shutdown
self.shutdown_gracefully().await
}
async fn start_workflow_executor(&mut self) -> Result<(), ApplicationError> {
let token = self.shutdown_token.clone();
let handle = tokio::spawn(async move {
loop {
tokio::select! {
_ = token.cancelled() => {
info!("Workflow executor shutdown requested");
break;
}
_ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {
// Process workflows
}
}
}
});
self.tasks.push(handle);
Ok(())
}
async fn wait_for_shutdown(&self) {
// Listen for shutdown signals
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();
let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()).unwrap();
tokio::select! {
_ = sigterm.recv() => info!("Received SIGTERM"),
_ = sigint.recv() => info!("Received SIGINT"),
}
self.shutdown_token.cancel();
}
async fn shutdown_gracefully(&mut self) -> Result<(), ApplicationError> {
info!("Starting graceful shutdown");
// Wait for all tasks to complete with timeout
let shutdown_timeout = tokio::time::Duration::from_secs(30);
tokio::time::timeout(shutdown_timeout, async {
for handle in self.tasks.drain(..) {
if let Err(e) = handle.await {
error!("Task failed during shutdown: {}", e);
}
}
}).await.map_err(|_| ApplicationError::ShutdownTimeout)?;
info!("Graceful shutdown completed");
Ok(())
}
}
```
## 🧪 TESTING CONCURRENT CODE
### Testing Async Functions
```rust
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::{timeout, Duration};
#[tokio::test]
async fn test_workflow_cache_concurrent_access() {
let cache = WorkflowCache::new();
let workflow = WorkflowDefinition::default();
// Test concurrent insertions
let mut handles = Vec::new();
for i in 0..10 {
let cache_clone = cache.clone();
let workflow_clone = workflow.clone();
handles.push(tokio::spawn(async move {
cache_clone.insert(format!("workflow_{}", i), workflow_clone).await;
}));
}
// Wait for all insertions
for handle in handles {
handle.await.unwrap();
}
// Verify all workflows were inserted
for i in 0..10 {
let result = cache.get(&format!("workflow_{}", i)).await;
assert!(result.is_some());
}
}
#[tokio::test]
async fn test_event_processor_with_timeout() {
let (processor, handle) = EventProcessor::new();
// Start processor in background
let processor_task = tokio::spawn(handle.run());
// Send test events
let event = WorkflowEvent::Started {
workflow_id: "test-workflow".to_string(),
timestamp: Utc::now(),
};
processor.send_event(event).unwrap();
// Test with timeout to prevent hanging
let result = timeout(Duration::from_secs(5), async {
// Give processor time to handle event
tokio::time::sleep(Duration::from_millis(100)).await;
}).await;
assert!(result.is_ok());
// Cleanup
drop(processor);
let _ = timeout(Duration::from_secs(1), processor_task).await;
}
}
```
## 🚨 CONCURRENCY ANTI-PATTERNS
### What to Avoid
```rust
// ❌ Don't use std::sync in async contexts
// use std::sync::{Mutex, RwLock};
//
// struct BadAsyncCache {
// data: std::sync::RwLock<HashMap<String, Value>>, // Blocks async runtime
// }
// ❌ Don't use parking_lot in async code
// use parking_lot::{RwLock, Mutex};
//
// struct AlsoBadAsyncCache {
// data: parking_lot::RwLock<HashMap<String, Value>>, // Also blocks
// }
// ❌ Don't use Mutex<HashMap> for concurrent collections
// struct BadRegistry {
// data: Arc<Mutex<HashMap<String, Value>>>, // Use DashMap instead
// }
// ❌ Don't forget to handle cancellation in long-running tasks
// tokio::spawn(async {
// loop {
// // This loop never checks for cancellation
// process_data().await;
// }
// });
// ❌ Don't block the async runtime
// async fn bad_function() {
// std::thread::sleep(Duration::from_secs(1)); // Blocks entire runtime
// }
```
## ✅ CONCURRENCY CHECKLIST
```markdown
### Concurrency Implementation Verification
- [ ] Uses tokio::sync primitives (not std::sync or parking_lot)
- [ ] DashMap used for concurrent collections instead of Mutex<HashMap>
- [ ] All long-running tasks support cancellation
- [ ] No blocking operations in async contexts
- [ ] Proper error handling in concurrent code
- [ ] Graceful shutdown implemented
- [ ] Tests include concurrent access scenarios
- [ ] Structured concurrency with JoinSet for parallel tasks
- [ ] Appropriate channel types used (mpsc, broadcast, oneshot)
- [ ] All async functions properly awaited
- [ ] No unwrap/expect in concurrent code
- [ ] Timeouts used for potentially hanging operations
```
This concurrency standard ensures safe, efficient, and maintainable concurrent Rust applications using modern async/await patterns.

View File

@@ -0,0 +1,428 @@
---
description:
globs:
alwaysApply: false
---
# 🗄️ RUST DATABASE BEST PRACTICES
> **TL;DR:** Comprehensive guidelines for database access in Rust using SQLx, focusing on type safety, async patterns, and testing strategies.
## 🎯 DATABASE LIBRARY SELECTION
### SQLx as the Standard
- **Always use SQLx** - avoid `rusqlite`, `tokio-postgres`, or other lower-level libraries
- **Support multiple databases** through SQLx's unified interface
- **Leverage compile-time query checking** when possible
- **Use async/await patterns** for all database operations
```toml
# Cargo.toml - SQLx configuration
[dependencies]
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres", "sqlite", "uuid", "chrono", "json"] }
```
## 🔧 QUERY PATTERNS
### Use sqlx::query_as Instead of Macros
```rust
// ✅ Preferred: Use sqlx::query_as with custom types
#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WorkflowExecution {
pub id: Uuid,
pub workflow_id: String,
pub status: ExecutionStatus,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub metadata: Option<serde_json::Value>,
}
impl WorkflowExecution {
pub async fn find_by_id(
pool: &PgPool,
id: Uuid
) -> Result<Option<Self>, sqlx::Error> {
sqlx::query_as::<_, WorkflowExecution>(
"SELECT id, workflow_id, status, created_at, updated_at, metadata
FROM workflow_executions
WHERE id = $1"
)
.bind(id)
.fetch_optional(pool)
.await
}
pub async fn list_by_workflow(
pool: &PgPool,
workflow_id: &str,
limit: i64,
offset: i64,
) -> Result<Vec<Self>, sqlx::Error> {
sqlx::query_as::<_, WorkflowExecution>(
"SELECT id, workflow_id, status, created_at, updated_at, metadata
FROM workflow_executions
WHERE workflow_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3"
)
.bind(workflow_id)
.bind(limit)
.bind(offset)
.fetch_all(pool)
.await
}
}
// ❌ Avoid: sqlx::query! macro (compile-time dependencies)
// let result = sqlx::query!("SELECT * FROM users WHERE id = $1", user_id)
// .fetch_one(pool)
// .await?;
```
### Entity Definition Patterns
```rust
use sqlx::{FromRow, PgPool, Row};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use chrono::{DateTime, Utc};
#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct User {
pub id: Uuid,
pub username: String,
pub email: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub is_active: bool,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CreateUserRequest {
pub username: String,
pub email: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct UpdateUserRequest {
pub username: Option<String>,
pub email: Option<String>,
pub is_active: Option<bool>,
}
```
## 🏗️ REPOSITORY PATTERN
### Repository Implementation
```rust
use async_trait::async_trait;
use sqlx::PgPool;
use uuid::Uuid;
#[async_trait]
pub trait UserRepository {
async fn create(&self, request: CreateUserRequest) -> Result<User, sqlx::Error>;
async fn find_by_id(&self, id: Uuid) -> Result<Option<User>, sqlx::Error>;
async fn find_by_email(&self, email: &str) -> Result<Option<User>, sqlx::Error>;
async fn update(&self, id: Uuid, request: UpdateUserRequest) -> Result<Option<User>, sqlx::Error>;
async fn delete(&self, id: Uuid) -> Result<bool, sqlx::Error>;
async fn list(&self, limit: i64, offset: i64) -> Result<Vec<User>, sqlx::Error>;
}
pub struct PostgresUserRepository {
pool: PgPool,
}
impl PostgresUserRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait]
impl UserRepository for PostgresUserRepository {
async fn create(&self, request: CreateUserRequest) -> Result<User, sqlx::Error> {
let id = Uuid::new_v4();
let now = Utc::now();
sqlx::query_as::<_, User>(
"INSERT INTO users (id, username, email, created_at, updated_at, is_active)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id, username, email, created_at, updated_at, is_active"
)
.bind(id)
.bind(&request.username)
.bind(&request.email)
.bind(now)
.bind(now)
.bind(true)
.fetch_one(&self.pool)
.await
}
async fn find_by_id(&self, id: Uuid) -> Result<Option<User>, sqlx::Error> {
sqlx::query_as::<_, User>(
"SELECT id, username, email, created_at, updated_at, is_active
FROM users WHERE id = $1"
)
.bind(id)
.fetch_optional(&self.pool)
.await
}
async fn find_by_email(&self, email: &str) -> Result<Option<User>, sqlx::Error> {
sqlx::query_as::<_, User>(
"SELECT id, username, email, created_at, updated_at, is_active
FROM users WHERE email = $1"
)
.bind(email)
.fetch_optional(&self.pool)
.await
}
async fn update(&self, id: Uuid, request: UpdateUserRequest) -> Result<Option<User>, sqlx::Error> {
let now = Utc::now();
sqlx::query_as::<_, User>(
"UPDATE users
SET username = COALESCE($2, username),
email = COALESCE($3, email),
is_active = COALESCE($4, is_active),
updated_at = $5
WHERE id = $1
RETURNING id, username, email, created_at, updated_at, is_active"
)
.bind(id)
.bind(request.username)
.bind(request.email)
.bind(request.is_active)
.bind(now)
.fetch_optional(&self.pool)
.await
}
async fn delete(&self, id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query("DELETE FROM users WHERE id = $1")
.bind(id)
.execute(&self.pool)
.await?;
Ok(result.rows_affected() > 0)
}
async fn list(&self, limit: i64, offset: i64) -> Result<Vec<User>, sqlx::Error> {
sqlx::query_as::<_, User>(
"SELECT id, username, email, created_at, updated_at, is_active
FROM users
ORDER BY created_at DESC
LIMIT $1 OFFSET $2"
)
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await
}
}
```
## 🧪 DATABASE TESTING
### Using sqlx-db-tester for Tests
```rust
#[cfg(test)]
mod tests {
use super::*;
use sqlx_db_tester::TestPg;
async fn setup_test_db() -> PgPool {
let tester = TestPg::new(
"postgres://postgres:password@localhost/test".to_string(),
std::path::Path::new("./migrations"),
);
let pool = tester.get_pool().await;
// Run migrations
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
pool
}
#[tokio::test]
async fn test_create_user() {
let pool = setup_test_db().await;
let repo = PostgresUserRepository::new(pool);
let request = CreateUserRequest {
username: "testuser".to_string(),
email: "test@example.com".to_string(),
};
let user = repo.create(request).await.unwrap();
assert_eq!(user.username, "testuser");
assert_eq!(user.email, "test@example.com");
assert!(user.is_active);
}
#[tokio::test]
async fn test_find_user_by_email() {
let pool = setup_test_db().await;
let repo = PostgresUserRepository::new(pool);
// Create a user first
let create_request = CreateUserRequest {
username: "findme".to_string(),
email: "findme@example.com".to_string(),
};
let created_user = repo.create(create_request).await.unwrap();
// Find by email
let found_user = repo.find_by_email("findme@example.com").await.unwrap();
assert!(found_user.is_some());
assert_eq!(found_user.unwrap().id, created_user.id);
}
#[tokio::test]
async fn test_update_user() {
let pool = setup_test_db().await;
let repo = PostgresUserRepository::new(pool);
// Create a user
let create_request = CreateUserRequest {
username: "updateme".to_string(),
email: "updateme@example.com".to_string(),
};
let user = repo.create(create_request).await.unwrap();
// Update the user
let update_request = UpdateUserRequest {
username: Some("updated_name".to_string()),
email: None,
is_active: Some(false),
};
let updated_user = repo.update(user.id, update_request).await.unwrap();
assert!(updated_user.is_some());
let updated_user = updated_user.unwrap();
assert_eq!(updated_user.username, "updated_name");
assert!(!updated_user.is_active);
}
}
```
## 📋 MIGRATION PATTERNS
### Migration File Structure
```sql
-- migrations/20240501000001_create_users_table.sql
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
username VARCHAR(255) NOT NULL UNIQUE,
email VARCHAR(255) NOT NULL UNIQUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
is_active BOOLEAN NOT NULL DEFAULT true
);
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_username ON users(username);
CREATE INDEX idx_users_created_at ON users(created_at);
-- Add trigger for updated_at
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_users_updated_at
BEFORE UPDATE ON users
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
```
## 🔧 CONNECTION MANAGEMENT
### Database Pool Configuration
```rust
use sqlx::{postgres::PgPoolOptions, PgPool};
use std::time::Duration;
pub async fn create_connection_pool(database_url: &str) -> Result<PgPool, sqlx::Error> {
PgPoolOptions::new()
.max_connections(20)
.min_connections(5)
.acquire_timeout(Duration::from_secs(30))
.idle_timeout(Duration::from_secs(600))
.max_lifetime(Duration::from_secs(1800))
.connect(database_url)
.await
}
// Application setup
pub async fn setup_database(config: &DatabaseConfig) -> Result<PgPool, Box<dyn std::error::Error>> {
let pool = create_connection_pool(&config.url).await?;
// Run migrations
sqlx::migrate!("./migrations").run(&pool).await?;
Ok(pool)
}
```
## 🚨 ANTI-PATTERNS TO AVOID
### Database Anti-Patterns
```rust
// ❌ Don't use rusqlite or tokio-postgres directly
// use rusqlite::{Connection, Result};
// use tokio_postgres::{NoTls, connect};
// ❌ Don't use sqlx::query! macro in production
// let user = sqlx::query!("SELECT * FROM users WHERE id = $1", id)
// .fetch_one(pool)
// .await?;
// ❌ Don't create entities without FromRow
// struct User {
// id: Uuid,
// name: String,
// }
// // Missing: #[derive(FromRow)]
// ❌ Don't forget serde camelCase configuration
// #[derive(Serialize, Deserialize)]
// struct User { // Missing: #[serde(rename_all = "camelCase")]
// user_id: String,
// }
// ❌ Don't create repositories without async traits
// impl UserRepository {
// fn find_by_id(&self, id: Uuid) -> User { // Should be async
// // ...
// }
// }
```
## ✅ DATABASE CHECKLIST
```markdown
### Database Implementation Verification
- [ ] Uses SQLx (not rusqlite/tokio-postgres)
- [ ] Entities derive `FromRow`, `Serialize`, `Deserialize`
- [ ] All serde structs use `#[serde(rename_all = "camelCase")]`
- [ ] Uses `sqlx::query_as` instead of `sqlx::query!` macro
- [ ] Repository pattern with async traits
- [ ] Comprehensive unit tests with sqlx-db-tester
- [ ] Migration files in `migrations/` directory
- [ ] Connection pool properly configured
- [ ] Proper error handling (no unwrap/expect)
- [ ] All database operations are async
- [ ] Tests cover CRUD operations
- [ ] Indexes defined for query performance
```
This database standard ensures type-safe, performant, and testable database access patterns across Rust applications.

View File

@@ -0,0 +1,583 @@
---
description:
globs:
alwaysApply: false
---
# 🛠️ RUST TOOLS & CONFIGURATION BEST PRACTICES
> **TL;DR:** Modern tooling choices and configuration patterns for Rust applications, focusing on maintainable and production-ready setups.
## 📊 LOGGING AND OBSERVABILITY
### Use Tracing Instead of env_logger
```rust
// ✅ Preferred: Use tracing ecosystem
use tracing::{info, warn, error, debug, instrument};
use tracing_subscriber::{
layer::SubscriberExt,
util::SubscriberInitExt,
EnvFilter,
Registry,
};
pub fn init_tracing() {
Registry::default()
.with(
tracing_subscriber::fmt::layer()
.with_target(false)
.compact()
)
.with(EnvFilter::from_default_env())
.init();
}
// Structured logging with context
#[instrument(skip(sensitive_data))]
pub async fn process_workflow(
workflow_id: &str,
user_id: &str,
sensitive_data: &[u8]
) -> Result<(), ProcessingError> {
info!("Starting workflow processing");
// Processing logic
debug!("Processing step 1 completed");
match perform_operation().await {
Ok(result) => {
info!(result_count = result.len(), "Processing completed successfully");
Ok(())
}
Err(e) => {
error!(error = %e, "Processing failed");
Err(e)
}
}
}
// ❌ Avoid: env_logger (deprecated pattern)
// use env_logger;
// env_logger::init();
```
### Tracing Configuration
```rust
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_subscriber::{
fmt::writer::MakeWriterExt,
layer::SubscriberExt,
util::SubscriberInitExt,
};
pub fn init_production_tracing(log_dir: &str) -> Result<(), Box<dyn std::error::Error>> {
// File appender with rotation
let file_appender = RollingFileAppender::new(
Rotation::daily(),
log_dir,
"application.log"
);
// Console output for development
let console_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.compact();
// File output for production
let file_layer = tracing_subscriber::fmt::layer()
.with_writer(file_appender)
.with_ansi(false)
.json();
tracing_subscriber::registry()
.with(console_layer)
.with(file_layer)
.with(EnvFilter::from_default_env())
.init();
Ok(())
}
```
## 📄 CONFIGURATION MANAGEMENT
### Use YAML Instead of TOML
```yaml
# config.yaml - Preferred configuration format
server:
host: "0.0.0.0"
port: 8080
workers: 4
database:
url: "postgresql://user:pass@localhost/db"
maxConnections: 20
minConnections: 5
timeoutSecs: 30
logging:
level: "info"
format: "json"
directory: "./logs"
features:
enableMetrics: true
enableTracing: true
debugMode: false
nodes:
ai:
defaultModel: "gpt-4o"
maxTokens: 4096
temperature: 0.7
crawler:
userAgent: "CellaBot/1.0"
timeout: 30
maxRetries: 3
```
### Configuration Loading Pattern
```rust
use serde::{Deserialize, Serialize};
use std::fs;
use anyhow::{Context, Result};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AppConfig {
pub server: ServerConfig,
pub database: DatabaseConfig,
pub logging: LoggingConfig,
pub features: FeatureConfig,
pub nodes: NodeConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ServerConfig {
pub host: String,
pub port: u16,
pub workers: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DatabaseConfig {
pub url: String,
pub max_connections: u32,
pub min_connections: u32,
pub timeout_secs: u64,
}
impl AppConfig {
pub fn load() -> Result<Self> {
// Try multiple config sources in order
Self::from_file("config.yaml")
.or_else(|_| Self::from_file("config.yml"))
.or_else(|_| Self::from_env())
.context("Failed to load configuration from any source")
}
pub fn from_file(path: &str) -> Result<Self> {
let content = fs::read_to_string(path)
.with_context(|| format!("Failed to read config file: {}", path))?;
serde_yaml::from_str(&content)
.with_context(|| format!("Failed to parse config file: {}", path))
}
pub fn from_env() -> Result<Self> {
// Load from environment variables with fallbacks
Ok(Self {
server: ServerConfig {
host: std::env::var("SERVER_HOST")
.unwrap_or_else(|_| "0.0.0.0".to_string()),
port: std::env::var("SERVER_PORT")
.unwrap_or_else(|_| "8080".to_string())
.parse()
.context("Invalid SERVER_PORT")?,
workers: std::env::var("SERVER_WORKERS")
.unwrap_or_else(|_| "4".to_string())
.parse()
.context("Invalid SERVER_WORKERS")?,
},
database: DatabaseConfig {
url: std::env::var("DATABASE_URL")
.context("DATABASE_URL environment variable required")?,
max_connections: std::env::var("DB_MAX_CONNECTIONS")
.unwrap_or_else(|_| "20".to_string())
.parse()
.context("Invalid DB_MAX_CONNECTIONS")?,
min_connections: std::env::var("DB_MIN_CONNECTIONS")
.unwrap_or_else(|_| "5".to_string())
.parse()
.context("Invalid DB_MIN_CONNECTIONS")?,
timeout_secs: std::env::var("DB_TIMEOUT_SECS")
.unwrap_or_else(|_| "30".to_string())
.parse()
.context("Invalid DB_TIMEOUT_SECS")?,
},
// ... other config sections
})
}
pub fn validate(&self) -> Result<()> {
if self.server.port == 0 {
anyhow::bail!("Server port cannot be 0");
}
if self.database.url.is_empty() {
anyhow::bail!("Database URL cannot be empty");
}
if self.server.workers == 0 {
anyhow::bail!("Server workers must be greater than 0");
}
Ok(())
}
}
impl Default for AppConfig {
fn default() -> Self {
Self {
server: ServerConfig {
host: "127.0.0.1".to_string(),
port: 8080,
workers: 4,
},
database: DatabaseConfig {
url: "sqlite::memory:".to_string(),
max_connections: 20,
min_connections: 5,
timeout_secs: 30,
},
// ... other default values
}
}
}
```
## 🔧 TEMPLATING WITH MINIJINJA
### Use MiniJinja Instead of Handlebars
```rust
// ✅ Preferred: Use minijinja for templating
use minijinja::{Environment, context};
use serde_json::Value;
use anyhow::Result;
pub struct TemplateEngine {
env: Environment<'static>,
}
impl TemplateEngine {
pub fn new() -> Self {
let mut env = Environment::new();
// Add custom filters
env.add_filter("format_date", format_date_filter);
env.add_filter("truncate", truncate_filter);
env.add_filter("json_path", json_path_filter);
// Add custom functions
env.add_function("now", now_function);
env.add_function("uuid", uuid_function);
Self { env }
}
pub fn render_template(&self, template: &str, data: &Value) -> Result<String> {
let tmpl = self.env.template_from_str(template)?;
let result = tmpl.render(context! { data => data })?;
Ok(result)
}
pub fn add_template(&mut self, name: &str, source: &str) -> Result<()> {
self.env.add_template(name, source)?;
Ok(())
}
pub fn render_named(&self, name: &str, data: &Value) -> Result<String> {
let tmpl = self.env.get_template(name)?;
let result = tmpl.render(context! { data => data })?;
Ok(result)
}
}
// Custom filters
fn format_date_filter(value: Value, format: Option<String>) -> Result<String, minijinja::Error> {
// Implementation for date formatting
todo!()
}
fn truncate_filter(value: Value, length: Option<usize>) -> Result<String, minijinja::Error> {
let text = value.as_str().unwrap_or("");
let len = length.unwrap_or(100);
if text.len() <= len {
Ok(text.to_string())
} else {
Ok(format!("{}...", &text[..len]))
}
}
fn json_path_filter(value: Value, path: String) -> Result<Value, minijinja::Error> {
// Use jsonpath-rust for extraction
use jsonpath_rust::JsonPathFinder;
let finder = JsonPathFinder::from_str(&value.to_string(), &path)
.map_err(|e| minijinja::Error::new(minijinja::ErrorKind::InvalidOperation, e.to_string()))?;
let result = finder.find();
Ok(serde_json::to_value(result).unwrap_or(Value::Null))
}
// Custom functions
fn now_function(_args: Vec<Value>) -> Result<Value, minijinja::Error> {
use chrono::Utc;
Ok(Value::String(Utc::now().to_rfc3339()))
}
fn uuid_function(_args: Vec<Value>) -> Result<Value, minijinja::Error> {
use uuid::Uuid;
Ok(Value::String(Uuid::new_v4().to_string()))
}
// ❌ Avoid: handlebars (less modern)
// use handlebars::Handlebars;
```
### Template Usage Examples
```rust
use serde_json::json;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_template_rendering() {
let engine = TemplateEngine::new();
let template = r#"
Hello {{ data.name }}!
Your workflow "{{ data.workflow_name }}" has {{ data.node_count }} nodes.
Results:
{% for result in data.results %}
- {{ result.node_name }}: {{ result.status }}
{% endfor %}
Generated at: {{ now() }}
Request ID: {{ uuid() }}
"#;
let data = json!({
"name": "John Doe",
"workflow_name": "HackerNews Summary",
"node_count": 3,
"results": [
{"node_name": "RSS Feed", "status": "success"},
{"node_name": "Content Fetch", "status": "success"},
{"node_name": "AI Summary", "status": "completed"}
]
});
let result = engine.render_template(template, &data).unwrap();
assert!(result.contains("Hello John Doe!"));
assert!(result.contains("HackerNews Summary"));
}
#[test]
fn test_json_path_filter() {
let engine = TemplateEngine::new();
let template = r#"
Title: {{ data | json_path("$.title") }}
First item: {{ data | json_path("$.items[0].name") }}
"#;
let data = json!({
"title": "My Workflow",
"items": [
{"name": "First Item", "value": 1},
{"name": "Second Item", "value": 2}
]
});
let result = engine.render_template(template, &data).unwrap();
assert!(result.contains("Title: My Workflow"));
assert!(result.contains("First item: First Item"));
}
}
```
## 🔍 DATA TRANSFORMATION WITH JSONPATH
### JsonPath Integration
```rust
use jsonpath_rust::{JsonPathFinder, JsonPathQuery};
use serde_json::Value;
use anyhow::{Result, Context};
pub struct DataTransformer {
template_engine: TemplateEngine,
}
impl DataTransformer {
pub fn new() -> Self {
Self {
template_engine: TemplateEngine::new(),
}
}
/// Extract data using JSONPath
pub fn extract_json_path(&self, data: &Value, path: &str) -> Result<Value> {
let path_query = JsonPathQuery::from(path);
let result = data.path(&path_query)
.context("Failed to execute JSONPath query")?;
Ok(serde_json::to_value(result)?)
}
/// Transform data using both JSONPath extraction and template rendering
pub fn transform(&self, input: &Value, config: &TransformConfig) -> Result<Value> {
match config {
TransformConfig::JsonPath { path } => {
self.extract_json_path(input, path)
}
TransformConfig::Template { template } => {
let rendered = self.template_engine.render_template(template, input)?;
Ok(Value::String(rendered))
}
TransformConfig::Composite { extractions, template } => {
let mut extracted_data = serde_json::Map::new();
// Extract data using JSONPath
for (key, path) in extractions {
let value = self.extract_json_path(input, path)?;
extracted_data.insert(key.clone(), value);
}
// Render template with extracted data
let data = Value::Object(extracted_data);
let rendered = self.template_engine.render_template(template, &data)?;
Ok(Value::String(rendered))
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum TransformConfig {
JsonPath {
path: String,
},
Template {
template: String,
},
Composite {
extractions: std::collections::HashMap<String, String>,
template: String,
},
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_json_path_extraction() {
let transformer = DataTransformer::new();
let data = json!({
"rss": {
"items": [
{"title": "Article 1", "url": "http://example.com/1"},
{"title": "Article 2", "url": "http://example.com/2"}
]
}
});
// Extract URLs
let urls = transformer.extract_json_path(&data, "$.rss.items[*].url").unwrap();
assert_eq!(urls, json!(["http://example.com/1", "http://example.com/2"]));
}
#[test]
fn test_composite_transformation() {
let transformer = DataTransformer::new();
let data = json!({
"articles": [
{"title": "First", "content": "Content 1"},
{"title": "Second", "content": "Content 2"}
]
});
let config = TransformConfig::Composite {
extractions: [
("titles".to_string(), "$.articles[*].title".to_string()),
("count".to_string(), "$.articles.length()".to_string()),
].into_iter().collect(),
template: "Found {{ count }} articles: {{ titles | join(', ') }}".to_string(),
};
let result = transformer.transform(&data, &config).unwrap();
let expected = "Found 2 articles: First, Second";
assert_eq!(result, Value::String(expected.to_string()));
}
}
```
## 🚨 CONFIGURATION ANTI-PATTERNS
### What to Avoid
```rust
// ❌ Don't use TOML for complex configurations
// [server]
// host = "0.0.0.0"
// port = 8080
//
// [database]
// url = "postgresql://..."
// # TOML becomes unwieldy for nested structures
// ❌ Don't use env_logger
// use env_logger;
// env_logger::init(); // Use tracing instead
// ❌ Don't use handlebars for new projects
// use handlebars::Handlebars; // Use minijinja instead
// ❌ Don't hardcode configuration values
// let database_url = "postgresql://localhost/mydb"; // Use config files/env vars
// ❌ Don't ignore configuration validation
// pub fn load_config() -> Config {
// serde_yaml::from_str(&content).unwrap() // Add proper validation
// }
```
## ✅ TOOLS & CONFIGURATION CHECKLIST
```markdown
### Tools & Configuration Verification
- [ ] Uses tracing instead of env_logger
- [ ] Configuration in YAML format (not TOML)
- [ ] All config structs use #[serde(rename_all = "camelCase")]
- [ ] Configuration validation implemented
- [ ] Environment variable fallbacks provided
- [ ] MiniJinja used for templating (not handlebars)
- [ ] JSONPath integration for data extraction
- [ ] Custom filters and functions in templates
- [ ] Structured logging with context
- [ ] File rotation for production logging
- [ ] Configuration loading from multiple sources
- [ ] Default values provided for all config options
```
This tools and configuration standard ensures modern, maintainable, and production-ready Rust applications with proper observability and flexible configuration management.