Q-learning, DQN, PPO, A3C, policy gradient methods, multi-agent systems, and Gym environments. Use for training agents, game AI, robotics, or decision-making systems.
Resources
3Install
npx skillscat add pluginagentmarketplace/custom-plugin-ai-data-scientist/reinforcement-learning Install via the SkillsCat registry.
SKILL.md
Reinforcement Learning
Train intelligent agents that learn optimal behavior through interaction with environments.
Quick Start
OpenAI Gymnasium Setup
import gymnasium as gym
import numpy as np
# Create environment
env = gym.make('CartPole-v1')
# Environment info
print(f"Observation space: {env.observation_space}")
print(f"Action space: {env.action_space}")
# Basic interaction loop
observation, info = env.reset()
for _ in range(1000):
action = env.action_space.sample() # Random action
observation, reward, terminated, truncated, info = env.step(action)
if terminated or truncated:
observation, info = env.reset()
env.close()Q-Learning (Tabular)
import numpy as np
class QLearning:
"""Tabular Q-Learning for discrete state/action spaces"""
def __init__(self, n_states, n_actions, lr=0.1, gamma=0.99, epsilon=1.0):
self.q_table = np.zeros((n_states, n_actions))
self.lr = lr
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
def get_action(self, state):
"""Epsilon-greedy action selection"""
if np.random.random() < self.epsilon:
return np.random.randint(self.q_table.shape[1])
return np.argmax(self.q_table[state])
def update(self, state, action, reward, next_state, done):
"""Update Q-value using Bellman equation"""
if done:
target = reward
else:
target = reward + self.gamma * np.max(self.q_table[next_state])
self.q_table[state, action] += self.lr * (target - self.q_table[state, action])
# Decay epsilon
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
# Training loop
env = gym.make('FrozenLake-v1')
agent = QLearning(n_states=16, n_actions=4)
for episode in range(10000):
state, _ = env.reset()
total_reward = 0
while True:
action = agent.get_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
agent.update(state, action, reward, next_state, terminated)
total_reward += reward
state = next_state
if terminated or truncated:
breakDeep Q-Network (DQN)
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
class DQN(nn.Module):
"""Deep Q-Network"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(DQN, self).__init__()
self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim)
)
def forward(self, x):
return self.network(x)
class ReplayBuffer:
"""Experience replay buffer"""
def __init__(self, capacity=100000):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
return (
torch.FloatTensor(states),
torch.LongTensor(actions),
torch.FloatTensor(rewards),
torch.FloatTensor(next_states),
torch.FloatTensor(dones)
)
def __len__(self):
return len(self.buffer)
class DQNAgent:
"""DQN Agent with target network and experience replay"""
def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99,
epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995):
self.action_dim = action_dim
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_min = epsilon_min
self.epsilon_decay = epsilon_decay
# Networks
self.policy_net = DQN(state_dim, action_dim)
self.target_net = DQN(state_dim, action_dim)
self.target_net.load_state_dict(self.policy_net.state_dict())
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
self.buffer = ReplayBuffer()
def get_action(self, state):
if np.random.random() < self.epsilon:
return np.random.randint(self.action_dim)
with torch.no_grad():
state = torch.FloatTensor(state).unsqueeze(0)
q_values = self.policy_net(state)
return q_values.argmax().item()
def train(self, batch_size=64):
if len(self.buffer) < batch_size:
return
states, actions, rewards, next_states, dones = self.buffer.sample(batch_size)
# Current Q values
current_q = self.policy_net(states).gather(1, actions.unsqueeze(1))
# Target Q values
with torch.no_grad():
next_q = self.target_net(next_states).max(1)[0]
target_q = rewards + self.gamma * next_q * (1 - dones)
# Loss
loss = nn.MSELoss()(current_q.squeeze(), target_q)
# Optimize
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Decay epsilon
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def update_target(self):
"""Update target network"""
self.target_net.load_state_dict(self.policy_net.state_dict())Policy Gradient Methods
REINFORCE
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
class PolicyNetwork(nn.Module):
"""Policy network for REINFORCE"""
def __init__(self, state_dim, action_dim, hidden_dim=128):
super().__init__()
self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim),
nn.Softmax(dim=-1)
)
def forward(self, x):
return self.network(x)
def get_action(self, state):
probs = self.forward(torch.FloatTensor(state))
dist = Categorical(probs)
action = dist.sample()
return action.item(), dist.log_prob(action)
class REINFORCE:
"""REINFORCE with baseline"""
def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99):
self.policy = PolicyNetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
self.gamma = gamma
def compute_returns(self, rewards):
"""Compute discounted returns"""
returns = []
G = 0
for r in reversed(rewards):
G = r + self.gamma * G
returns.insert(0, G)
returns = torch.tensor(returns)
# Normalize for stable training
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
return returns
def update(self, log_probs, rewards):
returns = self.compute_returns(rewards)
log_probs = torch.stack(log_probs)
# Policy gradient loss
loss = -(log_probs * returns).mean()
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Training
agent = REINFORCE(state_dim=4, action_dim=2)
for episode in range(1000):
state, _ = env.reset()
log_probs = []
rewards = []
while True:
action, log_prob = agent.policy.get_action(state)
next_state, reward, terminated, truncated, _ = env.step(action)
log_probs.append(log_prob)
rewards.append(reward)
state = next_state
if terminated or truncated:
break
agent.update(log_probs, rewards)Proximal Policy Optimization (PPO)
import torch
import torch.nn as nn
import torch.optim as optim
class ActorCritic(nn.Module):
"""Actor-Critic network for PPO"""
def __init__(self, state_dim, action_dim, hidden_dim=256):
super().__init__()
# Shared feature extractor
self.features = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU()
)
# Actor (policy)
self.actor = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim),
nn.Softmax(dim=-1)
)
# Critic (value function)
self.critic = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1)
)
def forward(self, x):
features = self.features(x)
return self.actor(features), self.critic(features)
class PPO:
"""Proximal Policy Optimization"""
def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99,
clip_ratio=0.2, epochs=10, batch_size=64):
self.model = ActorCritic(state_dim, action_dim)
self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
self.gamma = gamma
self.clip_ratio = clip_ratio
self.epochs = epochs
self.batch_size = batch_size
def compute_gae(self, rewards, values, dones, gamma=0.99, lam=0.95):
"""Generalized Advantage Estimation"""
advantages = []
gae = 0
for t in reversed(range(len(rewards))):
if t == len(rewards) - 1:
next_value = 0
else:
next_value = values[t + 1]
delta = rewards[t] + gamma * next_value * (1 - dones[t]) - values[t]
gae = delta + gamma * lam * (1 - dones[t]) * gae
advantages.insert(0, gae)
return torch.tensor(advantages)
def update(self, states, actions, old_log_probs, returns, advantages):
"""PPO update with clipping"""
for _ in range(self.epochs):
# Get current policy outputs
probs, values = self.model(states)
dist = Categorical(probs)
log_probs = dist.log_prob(actions)
entropy = dist.entropy().mean()
# Ratio for PPO clipping
ratio = torch.exp(log_probs - old_log_probs)
# Clipped surrogate loss
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1 - self.clip_ratio,
1 + self.clip_ratio) * advantages
actor_loss = -torch.min(surr1, surr2).mean()
# Critic loss
critic_loss = nn.MSELoss()(values.squeeze(), returns)
# Total loss with entropy bonus
loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy
self.optimizer.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(self.model.parameters(), 0.5)
self.optimizer.step()Multi-Agent RL
class MultiAgentEnv:
"""Simple multi-agent environment wrapper"""
def __init__(self, n_agents, env_fn):
self.n_agents = n_agents
self.envs = [env_fn() for _ in range(n_agents)]
def reset(self):
return [env.reset()[0] for env in self.envs]
def step(self, actions):
results = [env.step(a) for env, a in zip(self.envs, actions)]
observations = [r[0] for r in results]
rewards = [r[1] for r in results]
dones = [r[2] or r[3] for r in results]
return observations, rewards, dones
class IndependentLearners:
"""Independent Q-learning agents"""
def __init__(self, n_agents, state_dim, action_dim):
self.agents = [
DQNAgent(state_dim, action_dim)
for _ in range(n_agents)
]
def get_actions(self, observations):
return [agent.get_action(obs)
for agent, obs in zip(self.agents, observations)]
def train(self):
for agent in self.agents:
agent.train()Reward Shaping
def shape_reward(reward, state, next_state, done, info):
"""Design better reward signals"""
shaped_reward = reward
# Progress reward (encourage forward movement)
if 'x_position' in info:
progress = info['x_position'] - info.get('prev_x', 0)
shaped_reward += 0.1 * progress
# Survival bonus
if not done:
shaped_reward += 0.01
# Penalty for dangerous states
if 'danger_zone' in info and info['danger_zone']:
shaped_reward -= 0.5
# Goal proximity reward
if 'goal_distance' in info:
shaped_reward += 0.1 * (1.0 / (info['goal_distance'] + 1))
return shaped_reward
# Curriculum learning
class CurriculumEnv:
"""Environment with difficulty progression"""
def __init__(self, base_env, difficulty_schedule):
self.env = base_env
self.schedule = difficulty_schedule
self.current_level = 0
self.episode_count = 0
def reset(self):
self.episode_count += 1
# Increase difficulty based on schedule
if self.episode_count in self.schedule:
self.current_level += 1
self._update_difficulty()
return self.env.reset()
def _update_difficulty(self):
# Modify environment parameters
passStable Baselines3 (Production Ready)
from stable_baselines3 import PPO, DQN, A2C
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines3.common.callbacks import EvalCallback
# Vectorized environments for parallel training
def make_env():
return gym.make('CartPole-v1')
env = DummyVecEnv([make_env for _ in range(4)])
# Train PPO agent
model = PPO(
'MlpPolicy',
env,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
n_epochs=10,
gamma=0.99,
gae_lambda=0.95,
clip_range=0.2,
verbose=1,
tensorboard_log="./ppo_logs/"
)
# Evaluation callback
eval_env = gym.make('CartPole-v1')
eval_callback = EvalCallback(
eval_env,
best_model_save_path='./best_model/',
log_path='./logs/',
eval_freq=1000,
n_eval_episodes=10
)
# Train
model.learn(total_timesteps=100000, callback=eval_callback)
# Save and load
model.save("ppo_cartpole")
model = PPO.load("ppo_cartpole")
# Inference
obs = env.reset()
for _ in range(1000):
action, _ = model.predict(obs, deterministic=True)
obs, reward, done, info = env.step(action)Hyperparameter Tuning
# Common hyperparameter ranges
rl_hyperparameters = {
"learning_rate": [1e-4, 3e-4, 1e-3],
"gamma": [0.95, 0.99, 0.999],
"batch_size": [32, 64, 128, 256],
"n_steps": [128, 256, 512, 2048],
"clip_range": [0.1, 0.2, 0.3],
"entropy_coef": [0.0, 0.01, 0.05],
"hidden_sizes": [(64, 64), (128, 128), (256, 256)]
}
# Optuna tuning
import optuna
def objective(trial):
lr = trial.suggest_float('lr', 1e-5, 1e-2, log=True)
gamma = trial.suggest_float('gamma', 0.9, 0.9999)
n_steps = trial.suggest_int('n_steps', 128, 2048, step=128)
model = PPO('MlpPolicy', env, learning_rate=lr,
gamma=gamma, n_steps=n_steps)
model.learn(total_timesteps=50000)
# Evaluate
mean_reward = evaluate_policy(model, eval_env, n_eval_episodes=10)
return mean_reward
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50)Common Issues & Solutions
Issue: Training instability
Solutions:
- Reduce learning rate
- Increase batch size
- Use gradient clipping
- Normalize observations and rewards
- Use proper random seedsIssue: Poor exploration
Solutions:
- Increase epsilon/entropy
- Use curiosity-driven exploration
- Add noise to actions (Gaussian, OU)
- Use count-based exploration bonusIssue: Reward hacking
Solutions:
- Careful reward design
- Use sparse rewards when possible
- Test with adversarial evaluation
- Monitor for unexpected behaviorsBest Practices
- Environment: Verify env correctness before training
- Normalization: Normalize states and rewards
- Logging: Track episode rewards, lengths, losses
- Reproducibility: Set seeds for all random sources
- Evaluation: Separate eval environment, many episodes
- Hyperparameters: Start with known good defaults
- Baseline: Compare against random policy