A production-ready machine learning system implementing end-to-end ML workflows with model training, registry, and prediction services with comprehensive monitoring capabilities.
- MLkit Package: Core ML pipeline package for training, deployment, and model management
- Training Environment: MLflow and Prefect-based pipeline for model training and deployment
- Model Registry: Centralized model storage and versioning with MinIO and MongoDB
- API Service: FastAPI-based prediction service with Redis caching and monitoring
- Tech Stack: FastAPI, MLflow, Prefect, MongoDB, MinIO, Redis, Prometheus, Grafana
-
Write as package: After notebook development, all codes and processes write as packages, (Add more useful exğlation)
- Reusable and scalable
- Test code and system
- Dynamic object creation to data preparation
-
MLkit Package: Comprehensive toolkit for ML operations
- Training pipeline management
- Model deployment automation
- Feature engineering utilities
- Experiment tracking integration
-
Pipeline Orchestration
- Three main pipelines:
- Feature Pipeline: Feature engineering and store updates
- Training Pipeline: Model training and evaluation
- Deployment Pipeline: Model conversion and registration
- Materialize Pipeline: Update feature store for api side
- Configuration-driven pipeline management using YAML
- Prefect-based workflow orchestration
- MLflow experiment tracking and model logging
- Three main pipelines:
-
Training & Experimentation
- MLflow experiment tracking
- Model performance logging
- Hyperparameter management
- Artifact storage
-
Model Registry
- Three-tier architecture:
- Registry API: Model management interface
- MongoDB: Metadata storage
- MinIO: Model artifact storage
- Model versioning and lifecycle management
- ONNX model conversion support
- Three-tier architecture:
- FastAPI-based REST API
- Redis integration for online feature serving
- MinIO for prediction data storage
- MongoDB for request logging
- Comprehensive monitoring with Prometheus and Grafana
- Development is based OOP
- Use Absttract Factory Pattern for dynamic object creation
# Abstract base classes for interfaces
class DataFrameAdapter(ABC, Generic[DF]):
@abstractmethod
def read_file(self, path: Path, file_format: str, **kwargs) -> DF:
pass
# Concrete implementations
class PolarsAdapter(DataFrameAdapter[pl.DataFrame]):
def read_file(self, path: Path, file_format: str, **kwargs) -> pl.DataFrame:
readers = {"csv": pl.read_csv, "parquet": pl.read_parquet}
return readers[file_format](path, **kwargs)
def process(self, df: DF) -> Tuple[DF, Dict[str, Any]]:
try:
logger.info("Starting data processing")
result = self._process_implementation(df)
logger.info("Processing completed successfully")
return result
except Exception as e:
logger.error(f"Processing failed: {str(e)}")
raise
class MLAdapter(Generic[M]):
def __init__(self, config: ModelConfig):
self.adapters = {
"xgboost": XGBoostAdapter,
"lightgbm": LightGBMAdapter,
"random_forest": RandomForestAdapter,
}
- Type-safe configurations
- Environment-specific settings
- Easy experiment management
- Clear parameter documentation
logger = create_logger()
logger.info("Starting the application")
- save logs as default logs folder as mlkit.logs
- All control project by make is purpose of development.
# Start all services
make start
# Start specific services
make start-training
# clean all cache file
make clean-file
graph TB
subgraph Training["Model Training Environment"]
Pipeline[Pipeline Service] --> MLflow[MLflow Server<br/>Experiment tracking<br/>Port: 5000]
Pipeline --> Prefect[Prefect Server<br/>Workflow orchestration<br/>Port: 4200]
Pipeline --> |trains and logs|MLflow
end
subgraph Registry["Model Registry"]
RegAPI[Registry API<br/>Port: 8000] --> RegMinio[MinIO<br/>Model artifacts<br/>Ports: 9000/9001]
RegAPI --> RegMongo[(MongoDB<br/>Model metadata<br/>Port: 27017)]
end
subgraph APIServices["API & Monitoring Stack"]
API[FastAPI Service<br/>Port: 8000] --> APIMinio[MinIO<br/>Storage<br/>Ports: 9000/9001]
API --> APIMongo[(MongoDB<br/>Port: 27017)]
Prometheus[Prometheus<br/>Port: 9090] --> |metrics|API
Grafana[Grafana<br/>Port: 3000] --> |dashboards|Prometheus
end
Pipeline --> |registers models|RegAPI
API --> |fetches models|RegAPI
.
├── mlkit/ # ML Pipeline Package
├── api/ # API Service
├── registry/ # Model Registry Service
├── project/ # Project-specific code
├── data/ # Data folder as raw and processed
├── mlops/ # Deployment configs
├── notebooks/ # Notebook file to improve the model and solutions
├── tests/ # Test suite
├── docs/ # Documentation
├── Makefile # Build automation
└── .env # Environment variables
Service | Port | Purpose |
---|---|---|
FastAPI | 8888 | Main prediction API |
MinIO | 9003 | Model storage |
MongoDB | 27018 | Service data |
Redis | 6379 | Feature cache |
Service | Port | Purpose |
---|---|---|
Registry API | 8000 | Model management |
MinIO | 9004 | Artifact storage |
MongoDB | 27019 | Metadata storage |
Service | Port | Purpose |
---|---|---|
MLflow | 5000 | Experiment tracking |
Prefect | 4200 | Workflow orchestration |
Service | Port | Purpose |
---|---|---|
Prometheus | 9090 | Metrics collection |
Grafana | 3000 | Visualization |
- Docker
- Docker Compose
- uv
- python = 3.11.5
you can use uv and or pip to install the dependencies
# uv version, install all package to test the project
uv sync
# pip3 version
pip install -r mlops/requirements.txt
# Start all services
make start
# Start specific services
make start-training
make start-registry
make start-api
# Stop services
make stop
# Format code
make format
# Run linting
make lint
# Clean cached files
make clean-file
# Service Ports
API_PORT=8888
REGISTRY_PORT=8000
# Storage Configuration
API_MINIO_PORT=9003
API_MONGODB_PORT=27018
# Security (Example values)
MONGODB_ROOT_USERNAME=root
MINIO_ACCESS_KEY=minioadmin
tracking:
experiment_name: "customer_purchases"
tracking_uri: "http://mlflow:5000"
model_config:
model_type: "xgboost"
task_type: "regression"
- API performance metrics
- Model prediction latency
- Resource utilization
- Error rates
- Grafana dashboards (Port 3000)
- Real-time monitoring
- Custom alert rules
- Network isolation
- Service authentication
- Secure credential management
- Access control policies
- Follow PEP 8 style guide
- Add tests for new features
- Update documentation
- Submit pull request
# config load
config_path = "config/pp_ml.yaml"
config = Config.load(config_path)
# load data
data = load_data(config=config,data_engine=data_engine)