Building an Adaptive NER System with MLOps: A Complete Technical Guide
Executive Summary
In this comprehensive guide, we’ll walk through building a production-grade Named Entity Recognition (NER) system that adapts to new data patterns using modern MLOps practices. This project combines rule-based classification, machine learning, unsupervised category discovery, and automated reporting in a unified pipeline that bridges R and Python ecosystems.
What we’re building:
- An intelligent text classification system that learns from transaction narratives
- Hybrid approach: rule-based NER + ML-powered adaptive learning
- Full MLOps stack with MLflow tracking and ZenML orchestration
- Bilingual pipeline (R ↔ Python) with automated R Markdown reporting
- Production-ready POC th…
Building an Adaptive NER System with MLOps: A Complete Technical Guide
Executive Summary
In this comprehensive guide, we’ll walk through building a production-grade Named Entity Recognition (NER) system that adapts to new data patterns using modern MLOps practices. This project combines rule-based classification, machine learning, unsupervised category discovery, and automated reporting in a unified pipeline that bridges R and Python ecosystems.
What we’re building:
- An intelligent text classification system that learns from transaction narratives
- Hybrid approach: rule-based NER + ML-powered adaptive learning
- Full MLOps stack with MLflow tracking and ZenML orchestration
- Bilingual pipeline (R ↔ Python) with automated R Markdown reporting
- Production-ready POC that handles concept drift and discovers new categories
Business Context: Financial institutions, e-commerce platforms, and expense management systems process millions of free-text transaction descriptions daily. Manually categorizing these is impossible at scale, yet accurate categorization is critical for fraud detection, expense reporting, budgeting, and financial analytics.
Traditional rule-based systems fail when encountering new merchants, products, or spending patterns. Our solution combines the reliability of expert-defined rules with machine learning’s adaptability, creating a system that improves continuously without manual intervention.
Table of Contents
- Architecture Overview
- Technology Stack Deep Dive
- Data Model & Processing Pipeline
- Rule-Based NER Implementation
- Machine Learning Components
- Unsupervised Category Discovery
- MLflow Integration & Model Tracking
- ZenML Orchestration
- R Integration & Interoperability
- Automated Reporting System
- Results & Performance Metrics
- Production Deployment Considerations
- Future Enhancements
Architecture Overview
System Design Philosophy
Our architecture follows a progressive enhancement strategy:
Raw Text → Rule-Based Filter → ML Classifier → Cluster Discovery → Human Review
Layer 1: Rule-Based Foundation
- Fast, deterministic, zero-latency classification
- Captures well-known patterns with high confidence
- No training required, interpretable results
- Coverage: ~60-70% of common transactions
Layer 2: ML Enhancement
- Handles edge cases and ambiguous text
- Learns from historical labeled data
- Amount-weighted training for financial impact
- Coverage: Additional 20-25% of transactions
Layer 3: Discovery Engine
- Unsupervised clustering of unknowns
- Identifies emerging spending patterns
- Suggests new categories for human validation
- Enables continuous system evolution
Layer 4: Human-in-the-Loop
- Low-confidence predictions flagged for review
- Discovered clusters presented for labeling
- Feedback loop retrains models automatically
Component Architecture
┌─────────────────────────────────────────────────────────────┐
│ Data Sources │
│ (CSV, Database, API feeds, File uploads) │
└────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ R: Data Preparation │
│ • Cleaning & normalization │
│ • Feature engineering │
│ • Exploratory analysis │
└────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Python: NER Classification Engine │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Rule-Based NER │ │ ML Classifier │ │
│ │ • Keyword match │ │ • TF-IDF │ │
│ │ • Regex patterns│ │ • Random Forest│ │
│ │ • Confidence │ │ • Probability │ │
│ └──────────────────┘ └──────────────────┘ │
│ ┌──────────────────────────────────────────┐ │
│ │ Cluster Discovery (DBSCAN) │ │
│ │ • Find unknown patterns │ │
│ │ • Suggest new categories │ │
│ └──────────────────────────────────────────┘ │
└────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ MLflow: Experiment Tracking │
│ • Model versioning │
│ • Metrics logging │
│ • Artifact storage │
│ • Model registry │
└────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ ZenML: Pipeline Orchestration │
│ • Step dependencies │
│ • Caching & lineage │
│ • Scheduled runs │
│ • Deployment automation │
└────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ R Markdown: Automated Reporting │
│ • Performance dashboards │
│ • Category distribution │
│ • Confidence analysis │
│ • Review recommendations │
└─────────────────────────────────────────────────────────────┘
Technology Stack Deep Dive
Core Technologies & Rationale
Python 3.9+
- Primary ML/NLP engine
- Rich ecosystem: scikit-learn, NLTK, spaCy
- MLflow & ZenML native support
- Industry standard for production ML
R 4.0+
- Data preparation & reporting
- Superior statistical analysis
- Excellent visualization (ggplot2, plotly)
- R Markdown for reproducible reports
- Strong in financial analytics community
MLflow 2.9+
- Experiment tracking & model registry
- Framework-agnostic tracking
- Model versioning with lineage
- REST API for model serving
- Local SQLite backend (production: PostgreSQL)
Why MLflow?
# Simple, powerful tracking
with mlflow.start_run():
mlflow.log_param("n_estimators", 100)
mlflow.log_metric("accuracy", 0.94)
mlflow.sklearn.log_model(model, "model")
ZenML 0.50+
- Pipeline orchestration
- Step caching for efficiency
- Lineage tracking
- Multi-cloud deployment
- Integrates with MLflow seamlessly
Why ZenML?
- Declarative pipeline definition
- Automatic artifact versioning
- Reproducible experiments
- Easy scaling to Kubernetes
Reticulate
- R ↔ Python bridge
- Seamless data transfer
- Call Python from R naturally
- Share objects between languages
Dependencies & Environment
Python Requirements:
pandas==2.1.0 # Data manipulation
numpy==1.24.0 # Numerical computing
scikit-learn==1.3.0 # ML algorithms
mlflow==2.9.0 # Experiment tracking
zenml==0.50.0 # Pipeline orchestration
pyyaml==6.0 # Configuration files
joblib==1.3.0 # Model serialization
R Dependencies:
tidyverse # Data wrangling (dplyr, ggplot2, etc.)
reticulate # Python integration
knitr # Report generation
rmarkdown # Document formatting
DT # Interactive tables
plotly # Interactive visualizations
yaml # Config parsing
Data Model & Processing Pipeline
Input Data Schema
Transaction {
narration: str # Free-text description
amount: float # Transaction amount (signed)
date: datetime # Transaction timestamp
account_id: str # Optional: account identifier
merchant_id: str # Optional: merchant code
}
Example Transaction Data:
narration,amount,date
"Purchase at Baby Store - Pampers diapers",45.99,2026-01-15
"Pharmacy - Baby lotion and wipes",23.50,2026-01-16
"Supermarket - Bread milk eggs cheese",67.80,2026-01-16
"Uber ride to downtown conference",28.00,2026-01-17
"Dr. Smith consultation fee",150.00,2026-01-18
"Shell Gas Station #4521",55.20,2026-01-19
"Payment to ACME CORP INV-2024-001",1200.00,2026-01-20
Output Schema
ClassifiedTransaction {
narration: str # Original text
amount: float # Original amount
category: str # Assigned category
confidence: float # Classification confidence [0-1]
method: str # 'rule-based' | 'ml-based'
keywords_matched: List[str] # Matched keywords (if rule-based)
probability_dist: Dict # Class probabilities (if ML)
needs_review: bool # Flag for human review
cluster_id: int # Discovered cluster (if unknown)
}
Data Preprocessing Pipeline
R: Initial Data Preparation
# src/R/data_prep.R
library(tidyverse)
library(lubridate)
prepare_transaction_data <- function(input_path, output_path) {
df <- read_csv(input_path) %>%
mutate(
# Text normalization
narration = str_trim(narration) %>%
str_to_lower() %>%
str_squish() %>% # Remove extra whitespace
str_replace_all("[^a-z0-9\\s]", " "), # Remove special chars
# Amount validation
amount = as.numeric(amount),
amount_abs = abs(amount),
# Date parsing
date = ymd(date),
# Derived features
is_large_transaction = amount_abs > 500,
transaction_type = if_else(amount >= 0, "credit", "debit"),
# Text features
word_count = str_count(narration, "\\S+"),
has_numbers = str_detect(narration, "\\d"),
# Create unique ID
transaction_id = row_number()
) %>%
filter(
!is.na(narration),
!is.na(amount),
nchar(narration) > 3 # Minimum text length
)
# Log preprocessing stats
cat("Preprocessing Summary:\n")
cat(" Total records:", nrow(df), "\n")
cat(" Date range:", min(df$date), "to", max(df$date), "\n")
cat(" Amount range: $", min(df$amount), "to $", max(df$amount), "\n")
cat(" Avg words per narration:", mean(df$word_count), "\n")
# Save cleaned data
write_csv(df, output_path)
return(df)
}
# Feature engineering for analysis
engineer_features <- function(df) {
df %>%
mutate(
# Temporal features
day_of_week = wday(date, label = TRUE),
is_weekend = day_of_week %in% c("Sat", "Sun"),
month = month(date, label = TRUE),
# Amount buckets
amount_bucket = case_when(
amount_abs < 10 ~ "micro",
amount_abs < 50 ~ "small",
amount_abs < 200 ~ "medium",
amount_abs < 1000 ~ "large",
TRUE ~ "very_large"
),
# Text complexity
text_complexity = case_when(
word_count <= 3 ~ "simple",
word_count <= 6 ~ "moderate",
TRUE ~ "complex"
)
)
}
Preprocessing Rationale:
- Lowercase normalization: Ensures "Pharmacy" and "pharmacy" match
- Special character removal: Reduces noise, improves keyword matching
- Amount features: Transaction size influences categorization importance
- Text complexity: Longer descriptions often more specific/categorizable
Rule-Based NER Implementation
Keyword Configuration
Our rule-based system uses a YAML configuration file for maintainability and non-developer editability:
# models/keyword_rules.yaml
categories:
Baby Items:
keywords:
- pampers
- diapers
- baby powder
- baby lotion
- wipes
- formula
- baby food
- onesie
- stroller
- crib
weight: 1.0
aliases: ["infant products", "nursery"]
Groceries:
keywords:
- supermarket
- grocery
- bread
- milk
- eggs
- cheese
- meat
- vegetables
- fruit
- walmart
- costco
- whole foods
weight: 1.0
aliases: ["food shopping", "provisions"]
Healthcare:
keywords:
- doctor
- pharmacy
- cvs
- walgreens
- medicine
- prescription
- clinic
- hospital
- medical
- dentist
- optometrist
weight: 1.5 # Higher weight for important category
aliases: ["medical", "health services"]
Transportation:
keywords:
- uber
- lyft
- taxi
- fuel
- gas
- parking
- metro
- train
- bus fare
- toll
weight: 1.0
aliases: ["travel", "commute"]
Utilities:
keywords:
- electric
- water bill
- gas bill
- internet
- phone bill
- verizon
- comcast
- att
weight: 1.2
aliases: ["bills", "services"]
Entertainment:
keywords:
- netflix
- spotify
- hulu
- disney plus
- movie
- cinema
- theater
- concert
- game
weight: 0.8
aliases: ["leisure", "recreation"]
# Matching configuration
matching:
min_confidence: 0.3
partial_match_penalty: 0.5
multi_word_bonus: 1.2
# Thresholds
unknown_threshold: 0.3 # Below this → ML classification
review_threshold: 0.5 # Below this → human review
Python NER Classifier Implementation
# src/python/ner_classifier.py
import pandas as pd
import numpy as np
import yaml
import re
from typing import Dict, List, Tuple, Optional
from pathlib import Path
class AdaptiveNERClassifier:
"""
Hybrid NER classifier combining rule-based and ML approaches
with unsupervised category discovery.
"""
def __init__(self, rules_path: str = "models/keyword_rules.yaml"):
"""Initialize classifier with keyword rules."""
self.rules_path = Path(rules_path)
self.load_rules()
# ML components (initialized later)
self.vectorizer = None
self.ml_classifier = None
self.cluster_model = None
# Tracking
self.discovered_categories = {}
self.classification_stats = {
'rule_based': 0,
'ml_based': 0,
'unknown': 0
}
def load_rules(self):
"""Load keyword rules from YAML config."""
with open(self.rules_path, 'r') as f:
config = yaml.safe_load(f)
self.categories = config['categories']
self.matching_config = config['matching']
self.unknown_threshold = config['unknown_threshold']
self.review_threshold = config['review_threshold']
# Precompile regex patterns for efficiency
self._compile_patterns()
def _compile_patterns(self):
"""Compile regex patterns for each keyword."""
self.patterns = {}
for category, info in self.categories.items():
patterns = []
for keyword in info['keywords']:
# Word boundary matching for precision
pattern = r'\b' + re.escape(keyword) + r'\b'
patterns.append(re.compile(pattern, re.IGNORECASE))
self.patterns[category] = patterns
def keyword_match(self, text: str) -> Tuple[str, float, List[str]]:
"""
Rule-based keyword matching with confidence scoring.
Returns:
(category, confidence, matched_keywords)
"""
text_lower = text.lower()
text_words = set(text_lower.split())
matches = {}
matched_kw = {}
for category, patterns in self.patterns.items():
match_count = 0
category_matches = []
for pattern, keyword in zip(patterns,
self.categories[category]['keywords']):
if pattern.search(text):
match_count += 1
category_matches.append(keyword)
if match_count > 0:
# Weight by category importance
weight = self.categories[category]['weight']
# Bonus for multiple keyword matches
if match_count > 1:
weight *= self.matching_config['multi_word_bonus']
matches[category] = match_count * weight
matched_kw[category] = category_matches
if not matches:
return "Unknown", 0.0, []
# Best matching category
best_category = max(matches, key=matches.get)
# Confidence based on match strength relative to text length
raw_score = matches[best_category]
text_length = len(text_words)
confidence = min(raw_score / max(text_length, 1), 1.0)
return best_category, confidence, matched_kw[best_category]
def classify_single(self, text: str, amount: float = None) -> Dict:
"""
Classify a single transaction.
Args:
text: Transaction narration
amount: Transaction amount (optional, for weighted decisions)
Returns:
Classification result dictionary
"""
# Rule-based classification
category, confidence, keywords = self.keyword_match(text)
result = {
'narration': text,
'amount': amount,
'category': category,
'confidence': confidence,
'method': 'rule-based',
'keywords_matched': keywords,
'needs_review': confidence < self.review_threshold
}
# If low confidence and ML model available, try ML
if confidence < self.unknown_threshold and self.ml_classifier is not None:
ml_result = self._ml_classify_single(text)
# Use ML if more confident
if ml_result['confidence'] > confidence:
result.update(ml_result)
result['method'] = 'ml-based'
result['fallback_from'] = 'rule-based'
self.classification_stats[
'rule_based' if result['method'] == 'rule-based' else 'ml_based'
] += 1
return result
def classify_batch(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Classify a batch of transactions efficiently.
Args:
df: DataFrame with 'narration' and 'amount' columns
Returns:
DataFrame with classification results
"""
results = []
for idx, row in df.iterrows():
result = self.classify_single(
row['narration'],
row.get('amount', None)
)
results.append(result)
return pd.DataFrame(results)
def get_stats(self) -> Dict:
"""Get classification statistics."""
total = sum(self.classification_stats.values())
return {
'total_classified': total,
'rule_based_pct': self.classification_stats['rule_based'] / total * 100,
'ml_based_pct': self.classification_stats['ml_based'] / total * 100,
'unknown_pct': self.classification_stats['unknown'] / total * 100
}
Rule-Based Classification Algorithm
Step-by-Step Process:
- Text Normalization
text_lower = text.lower()
text_words = set(text_lower.split())
Pattern Matching
- Iterate through all category patterns
- Use compiled regex for speed
- Count matches per category
Scoring
score = match_count * category_weight * multi_word_bonus
- Confidence Calculation
confidence = min(score / text_length, 1.0)
- Decision Logic
- If confidence ≥ unknown_threshold → Accept rule-based classification
- If confidence < unknown_threshold → Try ML classifier
- If confidence < review_threshold → Flag for human review
Performance Characteristics:
- Speed: ~0.1ms per transaction
- Accuracy: 85-90% on known patterns
- Interpretability: Full keyword traceability
- Maintenance: Easy keyword updates via YAML
Machine Learning Components
Feature Engineering for ML
# src/python/feature_engineering.py
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import StandardScaler
import numpy as np
class TransactionFeaturizer:
"""Extract features from transaction text and metadata."""
def __init__(self, max_features=500, ngram_range=(1, 3)):
self.tfidf = TfidfVectorizer(
max_features=max_features,
ngram_range=ngram_range,
min_df=2, # Ignore very rare terms
max_df=0.8, # Ignore very common terms
sublinear_tf=True, # Use log scaling
stop_words='english'
)
self.amount_scaler = StandardScaler()
self.fitted = False
def fit_transform(self, df: pd.DataFrame) -> np.ndarray:
"""Fit and transform features."""
# Text features
text_features = self.tfidf.fit_transform(df['narration'])
# Numerical features
numerical = self._extract_numerical_features(df)
numerical_scaled = self.amount_scaler.fit_transform(numerical)
# Combine
features = np.hstack([
text_features.toarray(),
numerical_scaled
])
self.fitted = True
return features
def transform(self, df: pd.DataFrame) -> np.ndarray:
"""Transform new data using fitted transformers."""
if not self.fitted:
raise ValueError("Featurizer not fitted. Call fit_transform first.")
text_features = self.tfidf.transform(df['narration'])
numerical = self._extract_numerical_features(df)
numerical_scaled = self.amount_scaler.transform(numerical)
return np.hstack([
text_features.toarray(),
numerical_scaled
])
def _extract_numerical_features(self, df: pd.DataFrame) -> np.ndarray:
"""Extract numerical features from transactions."""
features = []
# Amount features
features.append(df['amount'].abs().values.reshape(-1, 1))
features.append(np.log1p(df['amount'].abs()).values.reshape(-1, 1))
# Text length features
features.append(df['narration'].str.len().values.reshape(-1, 1))
features.append(df['narration'].str.split().str.len().values.reshape(-1, 1))
# Character diversity
features.append(
df['narration'].apply(lambda x: len(set(x)) / max(len(x), 1))
.values.reshape(-1, 1)
)
return np.hstack(features)
Random Forest Classifier
# src/python/train_model.py (ML section)
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, confusion_matrix
import mlflow.sklearn
class MLClassifierTrainer:
"""Train and evaluate ML classifier."""
def __init__(self):
self.featurizer = TransactionFeaturizer()
self.classifier = RandomForestClassifier(
n_estimators=100,
max_depth=15,
min_samples_split=10,
min_samples_leaf=4,
max_features='sqrt',
class_weight='balanced', # Handle class imbalance
random_state=42,
n_jobs=-1 # Use all CPU cores
)
def train(self, df: pd.DataFrame):
"""
Train classifier on labeled data.
Args:
df: DataFrame with 'narration', 'amount', 'category' columns
"""
# Filter out Unknown categories
train_df = df[df['category'] != 'Unknown'].copy()
if len(train_df) < 20:
print("⚠️ Insufficient training data. Need at least 20 labeled samples.")
return False
print(f"Training on {len(train_df)} samples across {train_df['category'].nunique()} categories")
# Extract features
X = self.featurizer.fit_transform(train_df)
y = train_df['category']
# Amount-based sample weighting
# Give more weight to high-value transactions
sample_weights = np.log1p(train_df['amount'].abs())
sample_weights = sample_weights / sample_weights.sum()
# Train-test split
X_train, X_test, y_train, y_test, w_train, w_test = train_test_split(
X, y, sample_weights,
test_size=0.2,
random_state=42,
stratify=y
)
# Train model
self.classifier.fit(X_train, y_train, sample_weight=w_train)
# Evaluate
train_score = self.classifier.score(X_train, y_train)
test_score = self.classifier.score(X_test, y_test)
# Cross-validation
cv_scores = cross_val_score(
self.classifier, X_train, y_train,
cv=5, scoring='f1_weighted'
)
print(f"✓ Training accuracy: {train_score:.3f}")
print(f"✓ Test accuracy: {test_score:.3f}")
print(f"✓ CV F1 score: {cv_scores.mean():.3f} (+/- {cv_scores.std():.3f})")
# Detailed classification report
y_pred = self.classifier.predict(X_test)
print("\nClassification Report:")
print(classification_report(y_test, y_pred))
return True
def predict(self, df: pd.DataFrame) -> pd.DataFrame:
"""Predict categories for new transactions."""
X = self.featurizer.transform(df)
predictions = self.classifier.predict(X)
probabilities = self.classifier.predict_proba(X)
# Get confidence (max probability)
confidences = probabilities.max(axis=1)
# Get full probability distribution
prob_dists = [
dict(zip(self.classifier.classes_, probs))
for probs in probabilities
]
result_df = df.copy()
result_df['category'] = predictions
result_df['confidence'] = confidences
result_df['probability_dist'] = prob_dists
result_df['method'] = 'ml-based'
return result_df
Why Random Forest?
- Handles mixed features: Text (TF-IDF) + numerical (amounts)
- Robust to noise: Tree averaging reduces overfitting
- Feature importance: Interpretable results
- No scaling needed: Trees are scale-invariant
- Built-in confidence: Probability estimates from tree votes
Hyperparameter Rationale:
n_estimators=100: Balance between performance and training timemax_depth=15: Prevent overfitting on noisy text datamin_samples_split=10: Require sufficient samples for splitsclass_weight='balanced': Handle imbalanced categoriesmax_features='sqrt': Standard heuristic for classification
Amount-Weighted Training
Key innovation: Not all transactions are equally important.
# High-value transactions get more weight
sample_weights = np.log1p(train_df['amount'].abs())
sample_weights = sample_weights / sample_weights.sum()
# Result: $1000 transaction has 3x influence of $100 transaction
Business Logic:
- $5 coffee miscategorization: Minor impact
- $5000 invoice miscategorization: Major impact
- Model learns to be more careful with large amounts
Unsupervised Category Discovery
DBSCAN Clustering for Unknown Transactions
When transactions don’t match existing categories, we use clustering to discover new patterns:
# src/python/category_discovery.py
from sklearn.cluster import DBSCAN
from sklearn.metrics import silhouette_score
from collections import Counter
import numpy as np
class CategoryDiscovery:
"""Discover new categories from unknown transactions using clustering."""
def __init__(self, min_cluster_size=3, eps=0.3):
self.min_cluster_size = min_cluster_size
self.eps = eps
self.featurizer = TransactionFeaturizer(max_features=200)
def discover_categories(self, unknown_texts: List[str]) -> Dict:
"""
Cluster unknown transactions to discover potential new categories.
Args:
unknown_texts: List of unclassified transaction narrations
Returns:
Dictionary of discovered clusters with sample texts
"""
if len(unknown_texts) < self.min_cluster_size:
print(f"⚠️ Need at least {self.min_cluster_size} unknown transactions for clustering")
return {}
print(f"Analyzing {len(unknown_texts)} unknown transactions...")
# Create temporary DataFrame for featurization
temp_df = pd.DataFrame({
'narration': unknown_texts,
'amount': [0] * len(unknown_texts) # Dummy amounts
})
# Extract features
X = self.featurizer.fit_transform(temp_df)
# DBSCAN clustering
# eps: maximum distance between samples in same cluster
# min_samples: minimum cluster size
clustering = DBSCAN(
eps=self.eps,
min_samples=self.min_cluster_size,
metric='cosine', # Good for text similarity
n_jobs=-1
)
labels = clustering.fit_predict(X)
# Analyze clusters
unique_labels = set(labels)
n_clusters = len(unique_labels) - (1 if -1 in unique_labels else 0)
n_noise = list(labels).count(-1)
print(f"✓ Found {n_clusters} potential new categories")
print(f" {n_noise} transactions remain as noise")
if n_clusters > 0:
silhouette = silhouette_score(X, labels, metric='cosine')
print(f" Silhouette score: {silhouette:.3f}")
# Extract cluster information
discovered = {}
for label in unique_labels:
if label == -1: # Noise cluster
continue
# Get texts in this cluster
cluster_mask = (labels == label)
cluster_texts = [unknown_texts[i] for i, m in enumerate(cluster_mask) if m]
# Analyze cluster
cluster_info = self._analyze_cluster(cluster_texts)
discovered[f"NewCategory_{label}"] = {
'sample_texts': cluster_texts[:10], # First 10 examples
'size': len(cluster_texts),
'keywords': cluster_info['top_keywords'],
'suggested_name': cluster_info['suggested_name']
}
return discovered
def _analyze_cluster(self, texts: List[str]) -> Dict:
"""Analyze a cluster to extract keywords and suggest a name."""
# Combine all texts
combined = ' '.join(texts)
words = combined.lower().split()
# Count word frequency
word_counts = Counter(words)
# Remove common stop words
stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for'}
word_counts = {w: c for w, c in word_counts.items()
if w not in stop_words and len(w) > 2}
# Top keywords
top_keywords = [w for w, c in word_counts.most_common(5)]
# Suggest category name based on most common keyword
if top_keywords:
suggested_name = top_keywords[0].title() + " Related"
else:
suggested_name = "Miscellaneous"
return {
'top_keywords': top_keywords,
'suggested_name': suggested_name
}
def visualize_clusters(self, unknown_texts: List[str],
labels: np.ndarray,
save_path: str = None):
"""Visualize clusters using t-SNE dimensionality reduction."""
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
temp_df = pd.DataFrame({
'narration': unknown_texts,
'amount': [0] * len(unknown_texts)
})
X = self.featurizer.transform(temp_df)
# Reduce to 2D for visualization
tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, len(X)-1))
X_2d = tsne.fit_transform(X)
# Plot
plt.figure(figsize=(12, 8))
scatter = plt.scatter(X_2d[:, 0], X_2d[:, 1],
c=labels, cmap='tab10',
alpha=0.6, s=100)
plt.colorbar(scatter)
plt.title('Discovered Category Clusters (t-SNE Visualization)')
plt.xlabel('Dimension 1')
plt.ylabel('Dimension 2')
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
DBSCAN Parameter Selection
eps (epsilon): Maximum distance between points in same cluster
- Text similarity typically 0.2-0.4
- Lower = tighter, more conservative clusters
- Higher = looser, more permissive clusters
min_samples: Minimum cluster size
- Set to 3-5 for transaction data
- Prevents overfitting to noise
- Requires pattern repetition to count as category
Example Discovery Output:
{
"NewCategory_0": {
"size": 12,
"keywords": ["insurance", "policy", "premium", "geico", "coverage"],
"suggested_name": "Insurance Related",
"sample_texts": [
"geico auto insurance monthly premium",
"state farm policy renewal payment",
"allstate insurance payment confirmation"
]
},
"NewCategory_1": {
"size": 8,
"keywords": ["subscription", "monthly", "membership", "fee"],
"suggested_name": "Subscription Related",
"sample_texts": [
"linkedin premium monthly subscription",
"amazon prime membership renewal",
"new york times digital subscription"
]
}
}
MLflow Integration & Model Tracking
Experiment Tracking Setup
# src/python/train_model.py
import mlflow
import mlflow.sklearn
from pathlib import Path
import json
def setup_mlflow(experiment_name="NER-Classification",
tracking_uri="./mlruns"):
"""Configure MLflow tracking."""
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(experiment_name)
# Auto-log sklearn metrics
mlflow.sklearn.autolog(
log_models=True,
log_input_examples=True,
log_model_signatures=True
)
def train_and_log_model(data_path: str,
experiment_name: str = "NER-Classification"):
"""
Complete training pipeline with MLflow tracking.
"""
setup_mlflow(experiment_name)
# Load data
df = pd.read_csv(data_path)
with mlflow.start_run(run_name=f"training_{pd.Timestamp.now():%Y%m%d_%H%M%S}"):
# Log data info
mlflow.log_param("data_path", data_path)
mlflow.log_param("total_records", len(df))
mlflow.log_param("date_range", f"{df['date'].min()} to {df['date'].max()}")
# Initialize classifier
classifier = AdaptiveNERClassifier()
# Phase 1: Rule-based classification
print("\n=== Phase 1: Rule-Based Classification ===")
classified_df = classifier.classify_batch(df)
rule_coverage = (classified_df['category'] != 'Unknown').sum() / len(df)
rule_avg_confidence = classified_df[
classified_df['category'] != 'Unknown'
]['confidence'].mean()
mlflow.log_metric("rule_based_coverage", rule_coverage)
mlflow.log_metric("rule_based_avg_confidence", rule_avg_confidence)
print(f"✓ Rule-based coverage: {rule_coverage:.2%}")
# Log category distribution
category_dist = classified_df['category'].value_counts().to_dict()
mlflow.log_dict(category_dist, "rule_based_category_distribution.json")
# Phase 2: Category Discovery
print("\n=== Phase 2: Category Discovery ===")
discovery = CategoryDiscovery()
unknown_texts = classified_df[
classified_df['category'] == 'Unknown'
]['narration'].tolist()
new_categories = discovery.discover_categories(unknown_texts)
mlflow.log_metric("unknown_count", len(unknown_texts))
mlflow.log_metric("discovered_clusters", len(new_categories))
if new_categories:
mlflow.log_dict(new_categories, "discovered_categories.json")
# Create visualization
discovery.visualize_clusters(
unknown_texts,
labels=None, # Will be computed internally
save_path="cluster_visualization.png"
)
mlflow.log_artifact("cluster_visualization.png")
# Phase 3: ML Training
print("\n=== Phase 3: ML Model Training ===")
ml_trainer = MLClassifierTrainer()
training_success = ml_trainer.train(classified_df)
if training_success:
# Re-classify with ML model
final_df = ml_trainer.predict(df)
final_coverage = (final_df['category'] != 'Unknown').sum() / len(df)
final_avg_confidence = final_df['confidence'].mean()
mlflow.log_metric("final_coverage", final_coverage)
mlflow.log_metric("final_avg_confidence", final_avg_confidence)
mlflow.log_metric("ml_improvement", final_coverage - rule_coverage)
print(f"✓ Final coverage: {final_coverage:.2%}")
print(f"✓ Improvement: {(final_coverage - rule_coverage):.2%}")
# Feature importance analysis
feature_importance = ml_trainer.classifier.feature_importances_
top_features_idx = feature_importance.argsort()[-20:][::-1]
feature_names = ml_trainer.featurizer.tfidf.get_feature_names_out()
top_features = {
str(feature_names[i]): float(feature_importance[i])
for i in top_features_idx
}
mlflow.log_dict(top_features, "top_features.json")
# Save models
classifier.save_model("models/ner_classifier.pkl")
mlflow.log_artifact("models/ner_classifier.pkl")
# Save predictions
final_df.to_csv("data/processed/classified_transactions.csv", index=False)
mlflow.log_artifact("data/processed/classified_transactions.csv")
# Calculate business metrics
amount_weighted_accuracy = (
final_df[final_df['category'] != 'Unknown']['amount'].abs().sum() /
df['amount'].abs().sum()
)
mlflow.log_metric("amount_weighted_coverage", amount_weighted_accuracy)
# Low confidence analysis
low_conf_count = (final_df['confidence'] < 0.5).sum()
mlflow.log_metric("low_confidence_count", low_conf_count)
mlflow.log_metric("review_required_pct", low_conf_count / len(df))
print(f"\n✓ Model saved. Run ID: {mlflow.active_run().info.run_id}")
print(f"✓ {low_conf_count} transactions flagged for review")
return classifier, final_df
else:
print("⚠️ ML training skipped due to insufficient data")
return classifier, classified_df
if __name__ == "__main__":
import sys
data_path = sys.argv[1] if len(sys.argv) > 1 else "data/sample_transactions.csv"
train_and_log_model(data_path)
MLflow Tracking Dashboard
Once you run the training script, launch the MLflow UI:
mlflow ui --port 5000
Navigate to http://localhost:5000 to see:
Experiment Overview:
- All training runs with timestamps
- Sortable by metrics (coverage, accuracy, etc.)
- Comparison view for multiple runs
Run Details:
- Parameters: data path, record count, date range
- Metrics: coverage rates, confidence scores, improvements
- Artifacts: models, visualizations, JSON reports
- Model signature: input/output schema
Model Registry:
- Version history
- Stage management (staging, production)
- Deployment metadata
- Model lineage
Model Versioning Strategy
# Register model in MLflow Model Registry
mlflow.sklearn.log_model(
classifier,
"ner_classifier",
registered_model_name="TransactionNER"
)
# Promote to production
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="TransactionNER",
version=3,
stage="Production"
)
Version Lifecycle:
- None: Newly trained model
- Staging: Under validation
- Production: Actively serving predictions
- Archived: Superseded by newer version
ZenML Orchestration
Pipeline Definition
# src/pipelines/zenml_pipeline.py
from zenml import pipeline, step
from zenml.config import DockerSettings
from zenml.integrations.mlflow.flavors import MLFlowExperimentTrackerSettings
import pandas as pd
from typing import Tuple, Dict
import sys
sys.path.append('src/python')
from ner_classifier import AdaptiveNERClassifier
from category_discovery import CategoryDiscovery
from train_model import MLClassifierTrainer
# Configure MLflow integration
mlflow_settings = MLFlowExperimentTrackerSettings(
experiment_name="NER-ZenML-Pipeline",
nested=True
)
@step
def load_data(data_path: str) -> pd.DataFrame:
"""Load and validate transaction data."""
df = pd.read_csv(data_path)
# Validation
required_cols = ['narration', 'amount']
missing = set(required_cols) - set(df.columns)
if missing:
raise ValueError(f"Missing required columns: {missing}")
print(f"✓ Loaded {len(df)} transactions")
print(f" Date range: {df['date'].min()} to {df['date'].max()}")
print(f" Amount range: ${df['amount'].min():.2f} to ${df['amount'].max():.2f}")
return df
@step
def rule_based_classification(df: pd.DataFrame) -> pd.DataFrame:
"""Apply rule-based NER classification."""
classifier = AdaptiveNERClassifier()
classified = classifier.classify_batch(df)
stats = classifier.get_stats()
print(f"✓ Rule-based classification complete")
print(f" Coverage: {stats['rule_based_pct']:.1f}%")
return classified
@step
def discover_categories(df: pd.DataFrame) -> Dict:
"""Discover new categories from unknown items."""
discovery = CategoryDiscovery()
unknown_texts = df[df['category'] == 'Unknown']['narration'].tolist()
new_cats = discovery.discover_categories(unknown_texts)
print(f"✓ Category discovery complete")
print(f" Found {len(new_cats)} potential new categories")
return new_cats
@step
def train_ml_classifier(df: pd.DataFrame) -> MLClassifierTrainer:
"""Train ML classifier on labeled data."""
trainer = MLClassifierTrainer()
success = trainer.train(df)
if success:
print("✓ ML training complete")
else:
print("⚠️ ML training skipped (insufficient data)")
return trainer
@step
def final_classification(
df: pd.DataFrame,
trainer: MLClassifierTrainer
) -> pd.DataFrame:
"""Final classification with trained model."""
if trainer.classifier is not None:
final = trainer.predict(df)
print(f"✓ Final classification complete")
else:
final = df
print("⚠️ Using rule-based classification only")
return final
@step
def generate_metrics(results: pd.DataFrame, new_cats: Dict) -> Dict:
"""Calculate comprehensive metrics."""
metrics = {
'total_transactions': len(results),
'coverage': (results['category'] != 'Unknown').sum() / len(results),
'avg_confidence': results['confidence'].mean(),
'discovered_categories': len(new_cats),
'review_required': (results['confidence'] < 0.5).sum(),
'category_distribution': results['category'].value_counts().to_dict(),
'amount_by_category': results.groupby('category')['amount'].sum().to_dict()
}
print("\n=== Pipeline Metrics ===")
print(f"Coverage: {metrics['coverage']:.2%}")
print(f"Avg Confidence: {metrics['avg_confidence']:.3f}")
print(f"Review Required: {metrics['review_required']} transactions")
return metrics
@step
def save_results(
results: pd.DataFrame,
metrics: Dict,
new_cats: Dict
) -> str:
"""Save all results and artifacts."""
# Save classified transactions
output_path = "data/processed/final_results.csv"
results.to_csv(output_path, index=False)
# Save metrics
import json
with open("data/processed/metrics.json", 'w') as f:
json.dump(metrics, f, indent=2)
# Save discovered categories
with open("data/processed/discovered_categories.json", 'w') as f:
json.dump(new_cats, f, indent=2)
print(f"✓ Results saved to {output_path}")
return output_path
@pipeline(settings={"experiment_tracker": mlflow_settings})
def ner_classification_pipeline(data_path: str):
"""
Complete NER classification pipeline with MLOps tracking.
Steps:
1. Load and validate data
2. Rule-based classification
3. Discover new categories
4. Train ML classifier
5. Final classification
6. Generate metrics
7. Save results
"""
# Load data
df = load_data(data_path)
# Rule-based classification
classified = rule_based_classification(df)
# Discover new categories
new_cats = discover_categories(classified)
# Train ML model
trainer = train_ml_classifier(classified)
# Final classification
final_results = final_classification(df, trainer)
# Generate metrics
metrics = generate_metrics(final_results, new_cats)
# Save everything
output_path = save_results(final_results, metrics, new_cats)
return output_path
# For local execution
if __name__ == "__main__":
import sys
data_path = sys.argv[1] if len(sys.argv) > 1 else "data/sample_transactions.csv"
print("Starting NER Classification Pipeline...")
print(f"Data: {data_path}\n")
result = ner_classification_pipeline(data_path=data_path)
print(f"\n✓ Pipeline complete! Results: {result}")
ZenML Features Used
1. Step Caching
- ZenML automatically caches step outputs
- Rerun pipeline → only changed steps execute
- Saves time during development
2. Artifact Tracking
- Every step’s input/output versioned
- Full lineage from raw data to predictions
- Reproducible pipelines
3. Stack Components
- Orchestrator: Local, Airflow, or Kubernetes
- Artifact Store: Local, S3, or GCS
- Experiment Tracker: MLflow integration
- Model Deployer: Seldon, KServe, etc.
4. Pipeline Scheduling
# Schedule daily retraining
from zenml.pipelines import Schedule
schedule = Schedule(cron_expression="0 2 * * *") # 2 AM daily
ner_classification_pipeline.configure(schedule=schedule)
Running the Pipeline
# Initialize ZenML (first time only)
zenml init
# Register MLflow tracker
zenml experiment-tracker register mlflow_tracker --flavor=mlflow
# Set active stack
zenml stack set default
# Run pipeline
python src/pipelines/zenml_pipeline.py data/sample_transactions.csv
# View pipeline runs
zenml pipeline runs list
# View specific run
zenml pipeline runs get <run_id>
R Integration & Interoperability
Calling Python from R
# src/R/python_integration.R
library(reticulate)
library(tidyverse)
# Configure Python environment
use_virtualenv("~/PycharmProjects/Local_NER/venv", required = TRUE)
# Import Python modules
py <- import("sys")
py$path <- c(py$path, "src/python")
ner <- import("ner_classifier")
train_module <- import("train_model")
# Wrapper function for R
classify_transactions_r <- function(data_path, output_path = NULL) {
"""
Classify transactions using Python NER pipeline from R.
Args:
data_path: Path to CSV with transaction data
output_path: Optional path to save results
Returns:
Tibble with classification results
"""
# Call Python training function
cat("Starting Python NER pipeline...\n")
result <- train_module$train_and_log_model(data_path)
# Extract results
classifier <- result[[1]]
classified_df <- result[[2]]
# Convert to R tibble
results_tbl <- classified_df %>%
as_tibble() %>%
mutate(
category = as.factor(category),
method = as.factor(method),
needs_review = as.logical(needs_review)
)
cat("\n✓ Classification complete\n")
cat(" Transactions:", nrow(results_tbl), "\n")
cat(" Categories:", n_distinct(results_tbl$category), "\n")
cat(" Avg confidence:", mean(results_tbl$confidence), "\n")
# Optionally save
if (!is.null(output_path)) {
write_csv(results_tbl, output_path)
cat(" Saved to:", output_path, "\n")
}
return(results_tbl)
}
# Load pre-trained classifier
load_classifier_r <- function(model_path = "models/ner_classifier.pkl") {
"""Load saved classifier for inference."""
classifier <- ner$AdaptiveNERClassifier()
# Python pickle loading
pickle <- import("pickle")
with(open(model_path, "rb") %as% f, {
model_data <- pickle$load(f)
})
classifier$vectorizer <- model_data$vectorizer
classifier$ml_classifier <- model_data$classifier
classifier$rules <- model_data$rules
return(classifier)
}
# Classify single transaction
classify_single_r <- function(classifier, narration, amount = 0) {
"""Classify a single transaction."""
result <- classifier$classify_single(narration, amount)
tibble(
narration = result$narration,
amount = result$amount,
category = result$category,
confidence = result$confidence,
method = result$method,
needs_review = result$needs_review
)
}
# Batch classify from R dataframe
classify_batch_r <- function(classifier, df) {
"""Classify a batch of transactions from R dataframe."""
# Convert R dataframe to pandas
pandas <- import("pandas")
pdf <- r_to_py(df)
# Classify
result_pdf <- classifier$classify_batch(pdf)
# Convert back to R
result_df <- py_to_r(result_pdf) %>% as_tibble()
return(result_df)
}
Data Transfer Between R and Python
# Example usage
library(tidyverse)
library(reticulate)
# Prepare data in R
transactions <- tribble(
~narration, ~amount, ~date,
"walmart grocery shopping", 125.50, "2026-01-15",
"cvs pharmacy prescription", 45.00, "2026-01-16",
"uber ride downtown", 28.50, "2026-01-17"
) %>%
mutate(date = as.Date(date))
# Save for Python
write_csv(transactions, "data/temp_transactions.csv")
# Run Python classification
results <- classify_transactions_r("data/temp_transactions.csv")
# Analyze in R
results %>%
count(category, sort = TRUE) %>%
ggplot(aes(x = reorder(category, n), y = n)) +
geom_col(fill = "steelblue") +
coord_flip() +
labs(title = "Transaction Categories", x = NULL, y = "Count")
Handling R ↔ Python Data Types
| R Type | Python Type | Conversion |
|---|---|---|
| numeric | float | Automatic |
| integer | int | Automatic |
| character | str | Automatic |
| factor | str | Manual (as.character) |
| Date | datetime | Use py_to_r/r_to_py |
| data.frame | pandas.DataFrame | r_to_py(df) |
| tibble | pandas.DataFrame | r_to_py(df) |
| list | list/dict | Context-dependent |
Automated Reporting System
R Markdown Report Template
---
title: "NER Classification Assessment Report"
subtitle: "Automated MLOps Pipeline Results"
author: "Transaction Classification System"
date: "`r Sys.Date()`"
output:
html_document:
toc: true
toc_depth: 3
toc_float:
collapsed: false
smooth_scroll: true
theme: united
code_folding: hide
df_print: paged
params:
results_path: "data/processed/final_results.csv"
metrics_path: "data/processed/metrics.json"
run_id: "latest"
---
knitr::opts_chunk$set(
echo = TRUE,
warning = FALSE,
message = FALSE,
fig.width = 12,
fig.height = 8,
dpi = 300
)
library(tidyverse)
library(knitr)
library(kableExtra)
library(DT)
library(plotly)
library(scales)
library(jsonlite)
Executive Summary
# Load classification results
results <- read_csv(params$results_path) %>%
mutate(
category = as.factor(category),
method = as.factor(method)
)
# Load metrics
metrics <- fromJSON(params$metrics_path)
# Calculate key metrics
total_transactions <- nrow(results)
coverage_rate <- mean(results$category != "Unknown")
avg_confidence <- mean(results$confidence)
review_required <- sum(results$needs_review)
ml_usage_rate <- mean(results$method == "ml-based")
Pipeline Run Summary
- Total Transactions: `r format(total_transactions, big.mark=",")`
- Coverage Rate: `r percent(coverage_rate, accuracy=0.1)`
- Average Confidence: `r round(avg_confidence, 3)`
- Review Required: `r format(review_required, big.mark=",")` (`r percent(review_required/total_transactions, accuracy=0.1)`)
- ML Classification Rate: `r percent(ml_usage_rate, accuracy=0.1)`
# Category Distribution
## Transaction Count by Category
category_summary <- results %>%
group_by(category) %>%
summarise(
transactions = n(),
total_amount = sum(abs(amount)),
avg_amount = mean(abs(amount)),
avg_confidence = mean(confidence),
review_pct = mean(needs_review) * 100,
.groups = "drop"
) %>%
arrange(desc(transactions))
category_summary %>%
kable(
caption = "Category Summary Statistics",
col.names = c("Category", "Transactions", "Total Amount",
"Avg Amount", "Avg Confidence", "Review %"),
digits = c(0, 0, 2, 2, 3, 1),
format.args = list(big.mark = ",")
) %>%
kable_styling(
bootstrap_options = c("striped", "hover", "condensed"),
full_width = FALSE
) %>%
row_spec(0, bold = TRUE, color = "white", background = "#3498db")
## Interactive Pie Chart
plot_ly(
category_summa