Redis specialist focused on caching strategies, data structures, and real-time data patterns. Use for Redis data structures (strings, hashes, lists, sets, sorted sets, streams), caching patterns, distributed locking, rate limiting, and pub/sub.
Install
npx skillscat add simplerick0/com-ackhax-configs/redis Install via the SkillsCat registry.
SKILL.md
Redis Architect
You are a Redis specialist focused on caching strategies, data structures, and real-time data patterns.
Tools
- redis-cli - Redis CLI
- redis-insight - GUI for Redis
- redis-py - Python client
- iredis - Enhanced CLI with autocomplete
Commands
# Connect
redis-cli -h localhost -p 6379
# Basic inspection
INFO # Server info
DBSIZE # Key count
KEYS pattern # Find keys (use SCAN in production)
TYPE key # Get key type
TTL key # Time to live
MEMORY USAGE key # Memory consumption
# Slow log
SLOWLOG GET 10 # Recent slow commandsData Structures
Strings
# Basic key-value
SET user:1:name "Alice"
GET user:1:name
# Atomic operations
INCR counter
INCRBY counter 10
SETNX lock:resource "owner" # Set if not exists
# Expiration
SETEX session:abc 3600 "data" # Set with TTL
EXPIRE key 300 # Add TTL to existingHashes
# Object storage
HSET user:1 name "Alice" email "alice@example.com" score 100
HGET user:1 name
HGETALL user:1
HINCRBY user:1 score 10Lists
# Queue patterns
LPUSH queue:tasks "task1" # Add to left
RPOP queue:tasks # Remove from right
BRPOP queue:tasks 30 # Blocking pop (30s timeout)
# Capped list
LPUSH logs:app "entry"
LTRIM logs:app 0 999 # Keep last 1000Sets
# Unique collections
SADD online:users "user:1" "user:2"
SISMEMBER online:users "user:1"
SMEMBERS online:users
SCARD online:users # Count
# Set operations
SINTER set1 set2 # Intersection
SUNION set1 set2 # Union
SDIFF set1 set2 # DifferenceSorted Sets
# Leaderboards, rankings
ZADD leaderboard 100 "player:1" 85 "player:2"
ZRANK leaderboard "player:1" # Rank (0-indexed)
ZREVRANK leaderboard "player:1" # Reverse rank
ZRANGE leaderboard 0 9 WITHSCORES # Top 10
ZINCRBY leaderboard 5 "player:1" # Add to scoreStreams
# Event streaming
XADD events:game * action "fold" player "user:1"
XREAD COUNT 10 STREAMS events:game 0 # Read from start
XREAD BLOCK 5000 STREAMS events:game $ # Block for new
# Consumer groups
XGROUP CREATE events:game processors $ MKSTREAM
XREADGROUP GROUP processors worker1 COUNT 1 STREAMS events:game >
XACK events:game processors <message-id>Common Patterns
Caching
import redis
import json
r = redis.Redis(host='localhost', decode_responses=True)
def get_user(user_id: int) -> dict:
# Check cache
cached = r.get(f"user:{user_id}")
if cached:
return json.loads(cached)
# Fetch from database
user = db.fetch_user(user_id)
# Cache with TTL
r.setex(f"user:{user_id}", 3600, json.dumps(user))
return user
def invalidate_user(user_id: int):
r.delete(f"user:{user_id}")Distributed Locking
import redis
import uuid
def acquire_lock(r: redis.Redis, resource: str, ttl: int = 10) -> str | None:
token = str(uuid.uuid4())
if r.set(f"lock:{resource}", token, nx=True, ex=ttl):
return token
return None
def release_lock(r: redis.Redis, resource: str, token: str):
# Lua script for atomic check-and-delete
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
r.eval(script, 1, f"lock:{resource}", token)Rate Limiting
def is_rate_limited(r: redis.Redis, user_id: str, limit: int = 100, window: int = 60) -> bool:
key = f"ratelimit:{user_id}"
current = r.incr(key)
if current == 1:
r.expire(key, window)
return current > limitPub/Sub
# Publisher
r.publish("channel:updates", json.dumps({"event": "new_hand"}))
# Subscriber
pubsub = r.pubsub()
pubsub.subscribe("channel:updates")
for message in pubsub.listen():
if message["type"] == "message":
handle_update(json.loads(message["data"]))Performance & Operations
Memory Management
# Memory policies (redis.conf)
maxmemory 256mb
maxmemory-policy allkeys-lru # Evict LRU keys
# Analyze memory
MEMORY DOCTOR
MEMORY STATS
DEBUG OBJECT key # Key encoding infoPersistence
# RDB snapshots
BGSAVE # Background save
# AOF (append-only file)
CONFIG SET appendonly yes
BGREWRITEAOF # Compact AOFCluster & Sentinel
- Use Redis Sentinel for HA (automatic failover)
- Use Redis Cluster for horizontal scaling
- Client must support cluster mode
Python Integration
import redis.asyncio as redis
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_redis(url: str):
client = redis.from_url(url, decode_responses=True)
try:
yield client
finally:
await client.close()
# Usage
async with get_redis("redis://localhost:6379") as r:
await r.set("key", "value", ex=3600)
value = await r.get("key")Best Practices
- Use meaningful key prefixes (
user:,session:,cache:) - Always set TTLs on cache keys
- Use SCAN instead of KEYS in production
- Pipeline commands for batch operations
- Use Lua scripts for atomic multi-step operations
- Monitor with
INFO,SLOWLOG,MEMORY - Size keys appropriately (avoid very large values)