Optimization Algorithms in Deep Learning: From SGD to Adam
Optimization Algorithms in Deep Learning: From SGD to Adam
Training deep neural networks is fundamentally an optimization problem. Let's explore the evolution of optimization algorithms and understand their mathematical foundations.
The Optimization Problem
Deep learning aims to minimize a loss function:
Where:
- are the model parameters
- is the model prediction
- is the loss function
Gradient Descent Variants
1. Batch Gradient Descent
Updates parameters using the full dataset:
Where is the learning rate.
import numpy as np import matplotlib.pyplot as plt class BatchGradientDescent: def __init__(self, learning_rate=0.01): self.learning_rate = learning_rate self.history = {'loss': [], 'gradients': []} def compute_gradient(self, X, y, theta): """Compute gradient for linear regression.""" m = len(y) predictions = X.dot(theta) gradient = (1/m) * X.T.dot(predictions - y) return gradient def optimize(self, X, y, theta_init, max_iters=1000, tolerance=1e-6): """Perform batch gradient descent.""" theta = theta_init.copy() for i in range(max_iters): # Compute gradient on full dataset gradient = self.compute_gradient(X, y, theta) # Update parameters theta = theta - self.learning_rate * gradient # Compute loss predictions = X.dot(theta) loss = np.mean((predictions - y) ** 2) # Store history self.history['loss'].append(loss) self.history['gradients'].append(np.linalg.norm(gradient)) # Check convergence if len(self.history['loss']) > 1: if abs(self.history['loss'][-1] - self.history['loss'][-2]) < tolerance: break return theta # Example usage np.random.seed(42) X = np.random.randn(1000, 3) true_theta = np.array([2, -1, 0.5]) y = X.dot(true_theta) + 0.1 * np.random.randn(1000) # Add bias term X = np.column_stack([np.ones(X.shape[0]), X]) true_theta = np.concatenate([[1], true_theta]) # Initialize and optimize theta_init = np.random.randn(X.shape[1]) bgd = BatchGradientDescent(learning_rate=0.01) theta_final = bgd.optimize(X, y, theta_init) print(f"True parameters: {true_theta}") print(f"Estimated parameters: {theta_final}")
2. Stochastic Gradient Descent (SGD)
Updates parameters using one sample at a time:
class StochasticGradientDescent: def __init__(self, learning_rate=0.01): self.learning_rate = learning_rate self.history = {'loss': []} def optimize(self, X, y, theta_init, epochs=100): """Perform stochastic gradient descent.""" theta = theta_init.copy() n_samples = X.shape[0] for epoch in range(epochs): # Shuffle data indices = np.random.permutation(n_samples) epoch_loss = 0 for i in indices: # Compute gradient for single sample xi, yi = X[i:i+1], y[i:i+1] prediction = xi.dot(theta) gradient = xi.T.dot(prediction - yi).flatten() # Update parameters theta = theta - self.learning_rate * gradient # Accumulate loss epoch_loss += (prediction - yi) ** 2 # Average loss for epoch avg_loss = epoch_loss / n_samples self.history['loss'].append(avg_loss[0]) return theta sgd = StochasticGradientDescent(learning_rate=0.01) theta_sgd = sgd.optimize(X, y, theta_init, epochs=100)
3. Mini-batch Gradient Descent
Balances between batch and stochastic approaches:
class MiniBatchGD: def __init__(self, learning_rate=0.01, batch_size=32): self.learning_rate = learning_rate self.batch_size = batch_size self.history = {'loss': []} def create_batches(self, X, y): """Create mini-batches from data.""" n_samples = X.shape[0] indices = np.random.permutation(n_samples) for start_idx in range(0, n_samples, self.batch_size): end_idx = min(start_idx + self.batch_size, n_samples) batch_indices = indices[start_idx:end_idx] yield X[batch_indices], y[batch_indices] def optimize(self, X, y, theta_init, epochs=100): """Perform mini-batch gradient descent.""" theta = theta_init.copy() for epoch in range(epochs): epoch_loss = 0 batch_count = 0 for X_batch, y_batch in self.create_batches(X, y): # Compute gradient for batch predictions = X_batch.dot(theta) gradient = (1/len(y_batch)) * X_batch.T.dot(predictions - y_batch) # Update parameters theta = theta - self.learning_rate * gradient # Accumulate loss epoch_loss += np.mean((predictions - y_batch) ** 2) batch_count += 1 # Average loss for epoch avg_loss = epoch_loss / batch_count self.history['loss'].append(avg_loss) return theta mbgd = MiniBatchGD(learning_rate=0.01, batch_size=32) theta_mbgd = mbgd.optimize(X, y, theta_init, epochs=100)
Advanced Optimization Algorithms
1. SGD with Momentum
Adds momentum to accelerate convergence:
class SGDMomentum: def __init__(self, learning_rate=0.01, momentum=0.9): self.learning_rate = learning_rate self.momentum = momentum self.velocity = None self.history = {'loss': []} def optimize(self, X, y, theta_init, epochs=100): """SGD with momentum.""" theta = theta_init.copy() self.velocity = np.zeros_like(theta) for epoch in range(epochs): epoch_loss = 0 batch_count = 0 for X_batch, y_batch in self.create_batches(X, y): # Compute gradient predictions = X_batch.dot(theta) gradient = (1/len(y_batch)) * X_batch.T.dot(predictions - y_batch) # Update velocity and parameters self.velocity = self.momentum * self.velocity + self.learning_rate * gradient theta = theta - self.velocity # Accumulate loss epoch_loss += np.mean((predictions - y_batch) ** 2) batch_count += 1 avg_loss = epoch_loss / batch_count self.history['loss'].append(avg_loss) return theta def create_batches(self, X, y): """Create mini-batches.""" n_samples = X.shape[0] batch_size = 32 indices = np.random.permutation(n_samples) for start_idx in range(0, n_samples, batch_size): end_idx = min(start_idx + batch_size, n_samples) batch_indices = indices[start_idx:end_idx] yield X[batch_indices], y[batch_indices] momentum_optimizer = SGDMomentum(learning_rate=0.01, momentum=0.9) theta_momentum = momentum_optimizer.optimize(X, y, theta_init, epochs=100)
2. RMSprop
Adapts learning rates based on gradient magnitudes:
class RMSprop: def __init__(self, learning_rate=0.001, decay_rate=0.9, epsilon=1e-8): self.learning_rate = learning_rate self.decay_rate = decay_rate self.epsilon = epsilon self.cache = None self.history = {'loss': []} def optimize(self, X, y, theta_init, epochs=100): """RMSprop optimization.""" theta = theta_init.copy() self.cache = np.zeros_like(theta) for epoch in range(epochs): epoch_loss = 0 batch_count = 0 for X_batch, y_batch in self.create_batches(X, y): # Compute gradient predictions = X_batch.dot(theta) gradient = (1/len(y_batch)) * X_batch.T.dot(predictions - y_batch) # Update cache (exponential moving average of squared gradients) self.cache = self.decay_rate * self.cache + (1 - self.decay_rate) * gradient**2 # Update parameters theta = theta - self.learning_rate * gradient / (np.sqrt(self.cache) + self.epsilon) # Accumulate loss epoch_loss += np.mean((predictions - y_batch) ** 2) batch_count += 1 avg_loss = epoch_loss / batch_count self.history['loss'].append(avg_loss) return theta def create_batches(self, X, y): """Create mini-batches.""" n_samples = X.shape[0] batch_size = 32 indices = np.random.permutation(n_samples) for start_idx in range(0, n_samples, batch_size): end_idx = min(start_idx + batch_size, n_samples) batch_indices = indices[start_idx:end_idx] yield X[batch_indices], y[batch_indices] rmsprop = RMSprop(learning_rate=0.001) theta_rmsprop = rmsprop.optimize(X, y, theta_init, epochs=100)
3. Adam (Adaptive Moment Estimation)
Combines momentum and RMSprop:
class Adam: def __init__(self, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8): self.learning_rate = learning_rate self.beta1 = beta1 self.beta2 = beta2 self.epsilon = epsilon self.m = None # First moment self.v = None # Second moment self.t = 0 # Time step self.history = {'loss': []} def optimize(self, X, y, theta_init, epochs=100): """Adam optimization.""" theta = theta_init.copy() self.m = np.zeros_like(theta) self.v = np.zeros_like(theta) for epoch in range(epochs): epoch_loss = 0 batch_count = 0 for X_batch, y_batch in self.create_batches(X, y): self.t += 1 # Compute gradient predictions = X_batch.dot(theta) gradient = (1/len(y_batch)) * X_batch.T.dot(predictions - y_batch) # Update biased first moment estimate self.m = self.beta1 * self.m + (1 - self.beta1) * gradient # Update biased second raw moment estimate self.v = self.beta2 * self.v + (1 - self.beta2) * gradient**2 # Compute bias-corrected estimates m_hat = self.m / (1 - self.beta1**self.t) v_hat = self.v / (1 - self.beta2**self.t) # Update parameters theta = theta - self.learning_rate * m_hat / (np.sqrt(v_hat) + self.epsilon) # Accumulate loss epoch_loss += np.mean((predictions - y_batch) ** 2) batch_count += 1 avg_loss = epoch_loss / batch_count self.history['loss'].append(avg_loss) return theta def create_batches(self, X, y): """Create mini-batches.""" n_samples = X.shape[0] batch_size = 32 indices = np.random.permutation(n_samples) for start_idx in range(0, n_samples, batch_size): end_idx = min(start_idx + batch_size, n_samples) batch_indices = indices[start_idx:end_idx] yield X[batch_indices], y[batch_indices] adam = Adam(learning_rate=0.001) theta_adam = adam.optimize(X, y, theta_init, epochs=100)
Comparative Analysis
def compare_optimizers(X, y, theta_init, epochs=100): """Compare different optimization algorithms.""" optimizers = { 'SGD': SGDMomentum(learning_rate=0.01, momentum=0.0), 'SGD + Momentum': SGDMomentum(learning_rate=0.01, momentum=0.9), 'RMSprop': RMSprop(learning_rate=0.001), 'Adam': Adam(learning_rate=0.001) } results = {} plt.figure(figsize=(15, 10)) # Training comparison plt.subplot(2, 2, 1) for name, optimizer in optimizers.items(): theta_final = optimizer.optimize(X, y, theta_init.copy(), epochs=epochs) plt.plot(optimizer.history['loss'], label=name) results[name] = {'final_theta': theta_final, 'final_loss': optimizer.history['loss'][-1]} plt.xlabel('Epoch') plt.ylabel('Loss') plt.title('Training Loss Comparison') plt.legend() plt.grid(True) plt.yscale('log') # Convergence rate plt.subplot(2, 2, 2) for name, optimizer in optimizers.items(): loss_diff = np.diff(optimizer.history['loss']) plt.plot(np.abs(loss_diff), label=name) plt.xlabel('Epoch') plt.ylabel('|Δ Loss|') plt.title('Convergence Rate') plt.legend() plt.grid(True) plt.yscale('log') # Parameter evolution plt.subplot(2, 2, 3) colors = ['red', 'green', 'blue', 'orange'] for i, (name, result) in enumerate(results.items()): plt.bar(name, result['final_loss'], color=colors[i], alpha=0.7) plt.ylabel('Final Loss') plt.title('Final Loss Comparison') plt.xticks(rotation=45) plt.grid(True) # Learning rate sensitivity plt.subplot(2, 2, 4) learning_rates = [0.001, 0.01, 0.1, 1.0] for lr in learning_rates: adam_test = Adam(learning_rate=lr) theta_test = adam_test.optimize(X, y, theta_init.copy(), epochs=50) plt.plot(adam_test.history['loss'], label=f'LR={lr}') plt.xlabel('Epoch') plt.ylabel('Loss') plt.title('Adam: Learning Rate Sensitivity') plt.legend() plt.grid(True) plt.yscale('log') plt.tight_layout() plt.show() return results # Compare all optimizers comparison_results = compare_optimizers(X, y, theta_init, epochs=100) # Print final results print("\nFinal Results:") print("-" * 50) for name, result in comparison_results.items(): print(f"{name:15} | Loss: {result['final_loss']:.6f}")
Modern Optimizers
1. AdamW (Adam with Weight Decay)
Decouples weight decay from gradient-based optimization:
2. RAdam (Rectified Adam)
Addresses warmup issues in Adam:
Where is the length of the SMA (Simple Moving Average).
3. AdaBound
Combines Adam and SGD benefits:
Practical Implementation with TensorFlow
import tensorflow as tf def create_optimizer_comparison_model(): """Create a simple neural network for optimizer comparison.""" # Generate synthetic dataset np.random.seed(42) X_train = np.random.randn(1000, 10) y_train = np.sum(X_train[:, :3] ** 2, axis=1) + 0.1 * np.random.randn(1000) # Build model model = tf.keras.Sequential([ tf.keras.layers.Dense(64, activation='relu', input_shape=(10,)), tf.keras.layers.Dense(32, activation='relu'), tf.keras.layers.Dense(1) ]) # Test different optimizers optimizers = { 'SGD': tf.keras.optimizers.SGD(learning_rate=0.01), 'SGD + Momentum': tf.keras.optimizers.SGD(learning_rate=0.01, momentum=0.9), 'RMSprop': tf.keras.optimizers.RMSprop(learning_rate=0.001), 'Adam': tf.keras.optimizers.Adam(learning_rate=0.001), 'AdamW': tf.keras.optimizers.AdamW(learning_rate=0.001, weight_decay=0.01) } histories = {} for name, optimizer in optimizers.items(): print(f"\nTraining with {name}...") # Reset model weights model = tf.keras.Sequential([ tf.keras.layers.Dense(64, activation='relu', input_shape=(10,)), tf.keras.layers.Dense(32, activation='relu'), tf.keras.layers.Dense(1) ]) model.compile(optimizer=optimizer, loss='mse') history = model.fit( X_train, y_train, epochs=100, batch_size=32, validation_split=0.2, verbose=0 ) histories[name] = history.history return histories # Run comparison tf_histories = create_optimizer_comparison_model() # Plot TensorFlow results plt.figure(figsize=(15, 5)) plt.subplot(1, 2, 1) for name, history in tf_histories.items(): plt.plot(history['loss'], label=name) plt.xlabel('Epoch') plt.ylabel('Training Loss') plt.title('TensorFlow Optimizer Comparison - Training') plt.legend() plt.grid(True) plt.yscale('log') plt.subplot(1, 2, 2) for name, history in tf_histories.items(): plt.plot(history['val_loss'], label=name) plt.xlabel('Epoch') plt.ylabel('Validation Loss') plt.title('TensorFlow Optimizer Comparison - Validation') plt.legend() plt.grid(True) plt.yscale('log') plt.tight_layout() plt.show()
Optimizer Selection Guidelines
| Optimizer | Best For | Hyperparameters | Pros | Cons | |-----------|----------|-----------------|------|------| | SGD | Simple problems | Learning rate | Simple, well understood | Slow convergence | | SGD + Momentum | Most problems | LR, momentum | Faster than SGD | Still manual tuning | | RMSprop | RNNs, non-stationary | LR, decay rate | Adaptive learning rates | Can be unstable | | Adam | Most deep learning | LR, betas | Fast, robust | Memory overhead | | AdamW | Large models | LR, weight decay | Better generalization | More hyperparameters |
Learning Rate Scheduling
def learning_rate_schedules(): """Demonstrate different learning rate schedules.""" epochs = 100 # Step decay def step_decay(epoch, initial_lr=0.1, drop=0.5, epochs_drop=20): return initial_lr * np.power(drop, np.floor((epoch) / epochs_drop)) # Exponential decay def exp_decay(epoch, initial_lr=0.1, k=0.1): return initial_lr * np.exp(-k * epoch) # Cosine annealing def cosine_decay(epoch, initial_lr=0.1, max_epochs=100): return initial_lr * (1 + np.cos(np.pi * epoch / max_epochs)) / 2 # Plot schedules epochs_range = np.arange(epochs) plt.figure(figsize=(12, 4)) plt.subplot(1, 3, 1) plt.plot(epochs_range, [step_decay(e) for e in epochs_range], label='Step Decay') plt.xlabel('Epoch') plt.ylabel('Learning Rate') plt.title('Step Decay Schedule') plt.grid(True) plt.subplot(1, 3, 2) plt.plot(epochs_range, [exp_decay(e) for e in epochs_range], label='Exponential Decay') plt.xlabel('Epoch') plt.ylabel('Learning Rate') plt.title('Exponential Decay Schedule') plt.grid(True) plt.subplot(1, 3, 3) plt.plot(epochs_range, [cosine_decay(e) for e in epochs_range], label='Cosine Annealing') plt.xlabel('Epoch') plt.ylabel('Learning Rate') plt.title('Cosine Annealing Schedule') plt.grid(True) plt.tight_layout() plt.show() learning_rate_schedules()
Conclusion
Choosing the right optimizer is crucial for successful deep learning:
- Start with Adam: Good default choice for most problems
- Try SGD with momentum: Often better final performance
- Experiment with learning rates: Critical hyperparameter
- Use scheduling: Improve convergence and final performance
- Monitor training: Watch for overfitting and instability
Remember: no single optimizer works best for all problems. Understanding the mathematical foundations helps you make informed choices and debug training issues when they arise.