Batch create Linear tickets from a plan markdown file - parses phases/milestones, creates epic if needed, links dependencies
Install
npx skillscat add omninode-ai/omniclaude/plan-to-tickets Install via the SkillsCat registry.
Batch Create Tickets from Plan
Anti-Pattern Preamble (Mandatory)
Before creating any tickets, enforce these five rules. Violations are hard failures --
the skill MUST refuse to proceed and report which rule was violated.
No cross-repo tickets without
--allow-arch-violation.
If any entry's description references files in a different repo (detected by path prefix
or explicit repo label mismatch), fail fast with:"Cross-repo dependency detected in entry '{title}'. Use --allow-arch-violation to override."No duplicate phase IDs.
Ifdetect_structure()finds two entries mapping to the same internal ID (e.g., two## Phase 1:headings), fail fast. Do not silently rename or merge them.No circular dependencies.
After resolving allP#andOMN-####references, build a dependency graph and check
for cycles. If a cycle exists, fail fast with the cycle path:"Circular dependency detected: P1 -> P3 -> P1. Fix the plan before creating tickets."No empty content tickets.
If an entry has an empty or whitespace-only content block, fail fast:"Entry '{title}' has no content. Every ticket needs a description."No unlabeled repo when architecture validation is enabled.
If--repois not provided but the plan contains externalOMN-####dependencies,
warn and skip architecture validation (do not silently create cross-repo tickets).
Print:"Warning: --repo not specified. Architecture validation skipped for external deps."
These rules are enforced in Step 2 (after structure detection) and Step 7.5 (architecture
validation). The skill MUST check all five before proceeding to ticket creation.
Usage: /plan-to-tickets <plan-file> [options]
Arguments:
plan-file- Path to plan markdown file (required)--project <name>- Linear project name--epic-title <title>- Title for epic (overrides auto-detection from plan)--no-create-epic- Fail if epic doesn't exist (don't auto-create)--dry-run- Show what would be created without creating--skip-existing- Skip tickets that already exist (don't ask)--team <name>- Linear team name (default: Omninode)--repo <label>- Repository label for all tickets (e.g., omniclaude, omnibase_core)--allow-arch-violation- Bypass architecture dependency validation
Create Linear tickets from a plan markdown file. Parses phases or milestones, creates/links epic, resolves dependencies.
Announce at start: "Creating tickets from plan: {plan-file}"
Step 1: Read and Validate Plan File
from pathlib import Path
def read_plan_file(path: str) -> str:
"""Read plan file and validate it exists."""
plan_path = Path(path).expanduser()
if not plan_path.exists():
raise FileNotFoundError(f"Plan file not found: {path}")
return plan_path.read_text(encoding='utf-8')If file doesn't exist, report error and stop.
Step 2: Detect Plan Structure
Detection Cascade:
- If
## Task N:sections exist → use them (task_sections, canonical) - Else if
## Phase N:sections exist → use them (phase_sections, legacy alias) - Else if generic numbered
## N.headings exist → use them (numbered_h2) - Else if
## Step N:headings exist → use them (step_sections) - Else if flat checklist items exist → use them (
flat_tasks)
If none match → fail fast: "Plans must use ## Task N: headings. Use design-to-plan."
import re
def detect_structure(content: str) -> tuple[str, list[dict]]:
"""Detect plan structure and extract entries.
Returns:
(structure_type, entries) where structure_type is one of:
'task_sections' — ## Task N: Title headings (canonical)
'phase_sections' — ## Phase N: Title headings (legacy alias)
'numbered_h2' — generic ## N. Title headings
'step_sections' — ## Step N: Title headings
'flat_tasks' — flat checklist items
entries is list of {id, title, content, dependencies}
"""
def _extract_numbered_entries(matches, keyword: str, structure_type: str):
"""Shared extraction logic for ## Task N: and ## Phase N: headings."""
entries = []
for i, match in enumerate(matches):
num = match.group(1)
# Normalize: 1.5 -> 1_5 for valid ID
entry_id = num.replace('.', '_')
title = match.group(2).strip()
# Extract content until next ## heading or end
start = match.end()
if i + 1 < len(matches):
end = matches[i + 1].start()
else:
next_h2 = re.search(r'^## ', content[start:], re.MULTILINE)
end = start + next_h2.start() if next_h2 else len(content)
entry_content = content[start:end].strip()
deps = parse_dependencies(entry_content)
entries.append({
'id': f'P{entry_id}',
'title': f'{keyword} {num}: {title}',
'content': entry_content,
'dependencies': deps
})
# Check for duplicate IDs - fail fast to prevent wrong dependency linking
seen_ids = {}
for entry in entries:
if entry['id'] in seen_ids:
raise ValueError(
f"Duplicate {keyword.lower()} ID '{entry['id']}' found. "
f"First: '{seen_ids[entry['id']]}', Second: '{entry['title']}'. "
f"Fix plan file to use unique numbers."
)
seen_ids[entry['id']] = entry['title']
return (structure_type, entries)
# Try Task sections first (canonical)
# Captures decimal tasks: "## Task 1.5: Title" -> num = "1.5"
task_pattern = r'^## Task\s+(\d+(?:\.\d+)?):\s*(.+?)$'
task_matches = list(re.finditer(task_pattern, content, re.MULTILINE | re.IGNORECASE))
if task_matches:
return _extract_numbered_entries(task_matches, 'Task', 'task_sections')
# Try Phase sections (legacy alias — same behavior, different keyword)
phase_pattern = r'^## Phase\s+(\d+(?:\.\d+)?):\s*(.+?)$'
phase_matches = list(re.finditer(phase_pattern, content, re.MULTILINE | re.IGNORECASE))
if phase_matches:
return _extract_numbered_entries(phase_matches, 'Phase', 'phase_sections')
# Try Milestone table (legacy fallback)
# Expected format: | **M1** | Deliverable | Dependencies |
# Must have exactly 3 columns: ID, description, dependencies
if '## Milestones Overview' in content or '## Milestone Overview' in content:
# Find table rows with **M#** pattern
table_pattern = r'\|\s*\*\*M(\d+)\*\*\s*\|([^|]+)\|([^|]*)\|'
table_matches = re.findall(table_pattern, content)
if table_matches:
entries = []
for m_num, deliverable, deps_str in table_matches:
# Find corresponding ## Milestone N: section for content
section_pattern = rf'^## (?:Milestone\s+)?{m_num}:\s*(.+?)$'
section_match = re.search(section_pattern, content, re.MULTILINE | re.IGNORECASE)
if section_match:
title = section_match.group(1).strip()
# Extract content
start = section_match.end()
next_h2 = re.search(r'^## ', content[start:], re.MULTILINE)
end = start + next_h2.start() if next_h2 else len(content)
m_content = content[start:end].strip()
else:
title = deliverable.strip()
m_content = deliverable.strip()
deps = parse_dependency_string(deps_str)
entries.append({
'id': f'P{m_num}', # Normalize to P# internally
'title': f'M{m_num}: {title}',
'content': m_content,
'dependencies': deps
})
return ('milestone_table', entries)
# Try priority labels (third fallback)
# Matches bold P-prefixed labels used in ## Implementation Priority sections:
# **P0 — title**: description...
# **P0 — title** description...
# The em-dash (—), en-dash (–), or hyphen (-) are all accepted.
priority_pattern = r'^\*\*P(\d+)\s*[—\-–]\s*(.+?)(?:\*\*:|\*\*\s)'
priority_matches = list(re.finditer(priority_pattern, content, re.MULTILINE))
if not priority_matches:
# Broader fallback: **P0 — anything** at start of line
priority_pattern = r'^\*\*P(\d+)\s*[—\-–]\s*([^\n*]+?)(?:\*\*|$)'
priority_matches = list(re.finditer(priority_pattern, content, re.MULTILINE))
if priority_matches:
entries = []
for i, match in enumerate(priority_matches):
p_num = match.group(1)
title = match.group(2).strip().rstrip(':').strip()
# Content: from start of label line to next P-label or ## heading
label_start = match.start()
content_start = match.end()
if i + 1 < len(priority_matches):
end = priority_matches[i + 1].start()
else:
next_h2 = re.search(r'^## ', content[content_start:], re.MULTILINE)
end = content_start + next_h2.start() if next_h2 else len(content)
# Include the label line for full context in the ticket description
p_content = content[label_start:end].strip()
deps = parse_dependencies(p_content)
entries.append({
'id': f'P{p_num}',
'title': f'P{p_num}: {title}',
'content': p_content,
'dependencies': deps
})
# Check for duplicate IDs - fail fast
seen_ids = {}
for entry in entries:
if entry['id'] in seen_ids:
raise ValueError(
f"Duplicate priority ID '{entry['id']}' found. "
f"First: '{seen_ids[entry['id']]}', Second: '{entry['title']}'. "
f"Fix plan file to use unique priority numbers."
)
seen_ids[entry['id']] = entry['title']
return ('priority_labels', entries)
# No valid structure found - fail fast
return ('none', [])
def parse_dependencies(content: str) -> list[str]:
"""Extract dependencies from content block."""
# Look for "Dependencies:", "Depends on:", "Blocked by:" lines
dep_patterns = [
r'(?:Dependencies|Depends on|Blocked by|Requires):\s*(.+?)(?:\n|$)',
r'\*\*Dependencies?\*\*:\s*(.+?)(?:\n|$)',
]
for pattern in dep_patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
return parse_dependency_string(match.group(1))
return []
def parse_dependency_string(deps_str: str) -> list[str]:
"""Parse dependency string into normalized list.
Supports: Phase 1, M1, Milestone 1, P1, OMN-1234, None
Normalizes to: P1, P2, OMN-1234 format
"""
if not deps_str or deps_str.strip().lower() in ('none', 'n/a', '-', ''):
return []
deps = []
# Split on commas, "and", semicolons
parts = re.split(r'[,;&]|\band\b', deps_str)
for part in parts:
part = part.strip()
if not part:
continue
# OMN-#### ticket IDs - pass through
omn_match = re.match(r'(OMN-\d+)', part, re.IGNORECASE)
if omn_match:
deps.append(omn_match.group(1).upper())
continue
# Phase N, Phase 1.5, etc -> P{N} or P{N.N}
phase_match = re.match(r'Phase\s+(\d+(?:\.\d+)?)', part, re.IGNORECASE)
if phase_match:
# Normalize: P1.5 -> P1_5 for valid ID, or just use integer part
phase_id = phase_match.group(1).replace('.', '_')
deps.append(f'P{phase_id}')
continue
# M# or Milestone # -> P{N} (requires M prefix or "Milestone" word)
m_match = re.match(r'(?:Milestone\s+|M)(\d+)', part, re.IGNORECASE)
if m_match:
deps.append(f'P{m_match.group(1)}')
continue
# P# -> P{N} (also handles P1_5 or P1.5 decimal variants)
p_match = re.match(r'P(\d+(?:[._]\d+)?)', part, re.IGNORECASE)
if p_match:
# Normalize to underscore format for consistency
phase_id = p_match.group(1).replace('.', '_')
deps.append(f'P{phase_id}')
continue
return depsStep 3: Extract Epic Title
def extract_epic_title(content: str, override: str | None = None) -> str:
"""Extract epic title from plan or use override."""
if override:
return override
# Find first # heading
match = re.search(r'^# (.+?)$', content, re.MULTILINE)
if match:
return match.group(1).strip()
raise ValueError("No epic title found. Provide --epic-title or add a # heading to the plan.")Step 4: Resolve or Create Epic
def resolve_epic(epic_title: str, team: str, no_create: bool, project: str | None, dry_run: bool = False) -> dict | None:
"""Find existing epic or create new one.
Returns:
Epic issue dict with 'id' and 'identifier', or None if --no-create-epic and not found
"""
# Search for existing epic by title
issues = mcp__linear-server__list_issues(
query=epic_title,
team=team,
limit=50
)
# Filter for exact title matches (case-insensitive)
matches = [
i for i in issues.get('issues', [])
if i.get('title', '').lower().strip() == epic_title.lower().strip()
]
if len(matches) == 1:
# Single match - auto-link
epic = matches[0]
print(f"Found existing epic: {epic['identifier']} - {epic['title']}")
return epic
if len(matches) > 1:
# Multiple matches - ask user to disambiguate
options = [
{"label": f"{m['identifier']}: {m['title'][:40]}", "description": m.get('status', '')}
for m in matches[:10] # Limit to 10 for reasonable UI
]
response = AskUserQuestion(
questions=[{
"question": f"Multiple epics match '{epic_title}'. Which one?",
"header": "Epic",
"options": options,
"multiSelect": False
}]
)
selected = response.get('answers', {}).get('Epic')
if selected:
# Parse identifier from selected label
identifier = selected.split(':')[0]
for m in matches:
if m['identifier'] == identifier:
return m
raise ValueError("No epic selected. Aborting.")
# No matches - create new epic (unless --no-create-epic)
if no_create:
raise ValueError(f"No epic found matching '{epic_title}' and --no-create-epic was set.")
if dry_run:
print(f"[DRY RUN] Would create new epic: {epic_title}")
return {'id': 'DRY-EPIC', 'identifier': 'DRY-EPIC', 'title': epic_title, '_dry_run': True}
print(f"Creating new epic: {epic_title}")
params = {
"title": epic_title,
"team": team,
"description": f"Epic created from plan file.\n\n**Auto-generated by /plan-to-tickets**"
}
if project:
params["project"] = project
try:
epic = mcp__linear-server__create_issue(**params)
print(f"Created epic: {epic.get('identifier', 'unknown')} - {epic.get('title', epic_title)}")
return epic
except Exception as e:
raise ValueError(f"Failed to create epic '{epic_title}': {e}")Step 5: Build Ticket Descriptions
def build_ticket_description(entry: dict, structure_type: str, arch_violation_override: bool = False) -> str:
"""Build standardized ticket description from plan entry.
Args:
entry: Plan entry with title, content, dependencies
structure_type: Type of plan structure (phase_sections or milestone_table)
arch_violation_override: If True, add warning about architecture validation bypass
"""
lines = []
# Add architecture violation warning if applicable
if arch_violation_override:
lines.append("> **Architecture Override**: This ticket was created with `--allow-arch-violation` - cross-application dependency requires justification.\n")
lines.append("")
lines.append("## Summary\n")
lines.append(entry['content'][:500] if entry['content'] else f"Implementation for: {entry['title']}")
lines.append("")
lines.append(f"**Source**: Plan file ({structure_type})")
if entry['dependencies']:
lines.append(f"**Dependencies**: {', '.join(entry['dependencies'])}")
lines.append("")
# Include full content if longer
if len(entry['content']) > 500:
lines.append("## Details\n")
lines.append(entry['content'])
lines.append("")
lines.append("## Definition of Done\n")
lines.append("- [ ] Requirements implemented")
lines.append("- [ ] Tests added/updated")
lines.append("- [ ] Code reviewed")
lines.append("- [ ] Documentation updated (if applicable)")
return "\n".join(lines)Step 6: Check for Existing Tickets
def check_existing_ticket(title: str, team: str) -> dict | None:
"""Check if ticket with same title already exists."""
issues = mcp__linear-server__list_issues(
query=title,
team=team,
limit=50
)
for issue in issues.get('issues', []):
if issue.get('title', '').lower().strip() == title.lower().strip():
return issue
return NoneStep 7: Handle Conflicts
def handle_conflict(existing: dict, entry: dict, skip_existing: bool) -> str:
"""Handle ticket conflict. Returns action: 'update', 'skip', 'create_new', or 'abort'."""
if skip_existing:
return 'skip'
response = AskUserQuestion(
questions=[{
"question": f"Ticket '{existing['identifier']}' already exists with title '{existing['title'][:40]}...'. How to proceed?",
"header": "Conflict",
"options": [
{"label": "Skip", "description": "Don't create this ticket"},
{"label": "Update existing", "description": "Merge description into existing ticket"},
{"label": "Create new", "description": "Create duplicate with new ID"}
],
"multiSelect": False
}]
)
answer = response.get('answers', {}).get('Conflict', 'Skip')
if 'Skip' in answer:
return 'skip'
elif 'Update' in answer:
return 'update'
elif 'Create' in answer:
return 'create_new'
return 'skip'Step 7.5: Validate Architecture Dependencies
Before creating any tickets, validate that all external dependencies (OMN-#### references) respect the OmniNode architecture.
Reference: See plugins/onex/lib/dependency_validator.md for validation logic.
# Validation runs before batch creation
from lib.dependency_validator import validate_dependencies, filter_errors, filter_warnings, FOUNDATION_REPOS
def validate_plan_dependencies(
entries: list[dict],
plan_repo: str | None,
allow_override: bool,
dry_run: bool
) -> tuple[bool, bool]:
"""Validate all external dependencies in the plan.
Args:
entries: List of plan entries with 'dependencies' field
plan_repo: Repository label for all tickets in this plan
allow_override: If True, warn but don't block
dry_run: If True, just report what would be validated
Returns:
Tuple of (should_proceed, violations_overridden):
- should_proceed: True if validation passes (or override set), False if should abort
- violations_overridden: True if violations were found but overridden with --allow-arch-violation
"""
if not plan_repo:
print("Warning: No --repo specified. Skipping architecture validation.")
print(" Provide --repo to enable dependency validation.")
return (True, False) # proceed, no violations overridden
# Collect all external dependencies (OMN-#### format)
external_deps = []
for entry in entries:
for dep in entry.get('dependencies', []):
if dep.startswith('OMN-') and dep not in external_deps:
external_deps.append(dep)
if not external_deps:
return (True, False) # proceed, no violations overridden
if dry_run:
print(f"\n[DRY RUN] Would validate {len(external_deps)} external dependencies:")
for dep in external_deps:
print(f" - {dep}")
return (True, False) # proceed, no violations overridden (can't know in dry run)
# Validate each external dependency
all_errors = []
all_warnings = []
for dep in external_deps:
violations = validate_dependencies(
ticket_repo=plan_repo,
blocked_by_ids=[dep],
fetch_ticket_fn=lambda id: mcp__linear-server__get_issue(id=id)
)
errors = filter_errors(violations)
warnings = filter_warnings(violations)
all_errors.extend(errors)
all_warnings.extend(warnings)
# Report warnings
for w in all_warnings:
print(f"[WARNING] {w.message}")
# Handle errors
if all_errors:
print(f"\nArchitecture violations detected ({len(all_errors)}):\n")
for err in all_errors:
print(f" - {err.message}\n")
if allow_override:
print("[WARNING] Proceeding with architecture violations (--allow-arch-violation)\n")
return (True, True) # proceed, violations WERE overridden
else:
print("Valid dependencies flow: app->foundation or foundation->foundation.")
print("To proceed anyway, use --allow-arch-violation flag.")
print("\nNo tickets created. Fix dependencies or use override flag.")
return (False, False) # abort, no override
return (True, False) # proceed, no violations to override
# Call before Step 8
plan_repo = args.repo # May be None if not specified
should_proceed, arch_violation_override = validate_plan_dependencies(
entries=entries,
plan_repo=plan_repo,
allow_override=args.allow_arch_violation,
dry_run=args.dry_run
)
if not should_proceed:
raise SystemExit(1)
# arch_violation_override is True if violations were found but --allow-arch-violation was used
# This flag will be passed to create_tickets_batch to annotate tickets with warningsKey behavior:
- Internal refs (P1, P2, M1): Not validated (same plan = same repo)
- External refs (OMN-1234): Validated against architecture rules
- No --repo: Validation skipped with warning
- --dry-run: Shows what would be validated without API calls
- Violations found: Entire batch aborted (unless --allow-arch-violation)
Step 8: Create Tickets in Batch
def create_tickets_batch(
entries: list[dict],
epic: dict | None,
team: str,
project: str | None,
structure_type: str,
skip_existing: bool,
dry_run: bool,
arch_violation_override: bool = False
) -> dict:
"""Create all tickets from plan entries.
Args:
entries: List of plan entries to create tickets for
epic: Parent epic issue, or None
team: Linear team name
project: Linear project name, or None
structure_type: Type of plan structure detected
skip_existing: If True, skip existing tickets without asking
dry_run: If True, don't actually create tickets
arch_violation_override: If True, add warning annotation to ticket descriptions
Returns:
{created: [], skipped: [], updated: [], failed: [], id_map: {P1: OMN-xxx}}
"""
import time # For rate limiting between API calls
# Linear has ~65KB description limit - define once for consistency
MAX_DESC_SIZE = 60000 # Leave margin for safety
results = {
'created': [],
'skipped': [],
'updated': [],
'failed': [],
'id_map': {} # Maps P1 -> OMN-1234 for dependency resolution
}
for entry in entries:
print(f"\nProcessing: {entry['title']}")
# Check for existing
existing = check_existing_ticket(entry['title'], team)
if existing:
action = handle_conflict(existing, entry, skip_existing)
if action == 'skip':
results['skipped'].append({'entry': entry, 'existing': existing})
results['id_map'][entry['id']] = existing['identifier']
print(f" Skipped (exists): {existing['identifier']}")
continue
if action == 'update':
if not dry_run:
description = build_ticket_description(entry, structure_type, arch_violation_override)
existing_desc = existing.get('description', '') or ''
merged = f"{existing_desc}\n\n---\n\n## Updated from Plan\n\n{description}"
if len(merged) > MAX_DESC_SIZE:
merged = merged[:MAX_DESC_SIZE] + "\n\n[... truncated due to size limit]"
mcp__linear-server__update_issue(
id=existing['id'],
description=merged
)
results['updated'].append({'entry': entry, 'existing': existing})
results['id_map'][entry['id']] = existing['identifier']
print(f" Updated: {existing['identifier']}")
continue
# Create new ticket
description = build_ticket_description(entry, structure_type, arch_violation_override)
if len(description) > MAX_DESC_SIZE:
description = description[:MAX_DESC_SIZE] + "\n\n[... truncated due to size limit]"
# Resolve dependencies to actual ticket IDs (forward refs resolved in second pass)
blocked_by = []
unresolved_deps = []
for dep in entry['dependencies']:
if dep.startswith('OMN-'):
blocked_by.append(dep)
elif dep in results['id_map']:
blocked_by.append(results['id_map'][dep])
elif re.match(r'^P\d+(?:_\d+)?$', dep):
# Forward reference (P# format) - will be resolved in second pass
unresolved_deps.append(dep)
else:
# Unrecognized dependency format - warn user
print(f" Warning: Dependency '{dep}' has unrecognized format (expected OMN-###, P#, Phase #, or M#)")
if unresolved_deps:
# Store for second pass resolution
entry['_unresolved_deps'] = unresolved_deps
if dry_run:
print(f" [DRY RUN] Would create: {entry['title']}")
if blocked_by:
print(f" Dependencies: {blocked_by}")
results['created'].append({'entry': entry, 'dry_run': True})
results['id_map'][entry['id']] = f"DRY-{entry['id']}"
continue
# Rate limiting: small delay between API calls to avoid hitting Linear rate limits
time.sleep(0.2) # 200ms delay = max 5 requests/second, well under Linear limits
try:
params = {
"title": entry['title'],
"team": team,
"description": description
}
if project:
params["project"] = project
if epic:
params["parentId"] = epic['id']
if blocked_by:
params["blockedBy"] = blocked_by
result = mcp__linear-server__create_issue(**params)
results['created'].append({
'entry': entry,
'ticket': result,
'first_pass_blocked_by': blocked_by # Store for merge in second pass
})
results['id_map'][entry['id']] = result['identifier']
print(f" Created: {result.get('identifier', 'unknown')} - {result.get('url', '(no URL)')}")
except Exception as e:
results['failed'].append({'entry': entry, 'error': str(e)})
print(f" Failed: {e}")
# Second pass: resolve forward dependencies
for item in results['created']:
if item.get('dry_run'):
continue
entry = item['entry']
unresolved = entry.get('_unresolved_deps', [])
if not unresolved:
continue
# Resolve forward references now that all tickets exist
new_blocked_by = []
for dep in unresolved:
if dep in results['id_map']:
new_blocked_by.append(results['id_map'][dep])
else:
print(f" Warning: Unresolved dependency '{dep}' for {item['ticket']['identifier']}")
if new_blocked_by and not dry_run:
try:
# Merge with first-pass dependencies (Linear replaces, doesn't merge)
# Only include first-pass deps that exist in id_map (validated IDs)
first_pass = item.get('first_pass_blocked_by', [])
id_map_values = set(results['id_map'].values())
validated_first_pass = [
dep for dep in first_pass
if dep.startswith('OMN-') or dep in id_map_values
]
all_blocked_by = validated_first_pass + new_blocked_by
# Update ticket with combined dependencies
mcp__linear-server__update_issue(
id=item['ticket']['id'],
blockedBy=all_blocked_by
)
print(f" Linked forward deps for {item['ticket']['identifier']}: {new_blocked_by}")
except Exception as e:
print(f" Warning: Failed to link forward deps for {item['ticket']['identifier']}: {e}")
return resultsStep 9: Report Summary
def report_summary(results: dict, epic: dict | None, structure_type: str, dry_run: bool):
"""Print final summary."""
mode = "[DRY RUN] " if dry_run else ""
print(f"\n{'='*60}")
print(f"{mode}Plan to Tickets Summary")
print(f"{'='*60}")
if epic:
print(f"\nEpic: {epic.get('identifier', 'unknown')} - {epic.get('title', 'untitled')}")
print(f"Structure detected: {structure_type}")
print(f"\nResults:")
print(f" Created: {len(results['created'])}")
print(f" Skipped: {len(results['skipped'])}")
print(f" Updated: {len(results['updated'])}")
print(f" Failed: {len(results['failed'])}")
if results['created']:
print(f"\n### Created Tickets")
for item in results['created']:
if item.get('dry_run'):
print(f" - [DRY] {item['entry']['title']}")
else:
t = item['ticket']
print(f" - [{t.get('identifier', '?')}]({t.get('url', '#')}) - {t.get('title', '')[:50]}")
if results['skipped']:
print(f"\n### Skipped (already exist)")
for item in results['skipped']:
e = item['existing']
print(f" - {e['identifier']} - {e['title'][:50]}")
if results['failed']:
print(f"\n### Failed")
for item in results['failed']:
print(f" - {item['entry']['title']}: {item['error']}")Post-Creation: Generate Contracts (every ticket)
After the summary is printed, call generate-ticket-contract for every created (or skipped/updated)
ticket. Do NOT filter by seam-keyword — call for every ticket. The generator handles seam detection
internally and returns cheaply for non-seam tickets.
def generate_contracts_for_all(results: dict, dry_run: bool) -> list[dict]:
"""Call generate-ticket-contract for every ticket in results.
Args:
results: Output from create_tickets_batch
dry_run: If True, report what would be generated without calling the skill
Returns:
List of contract results: {ticket_id, status, is_seam, path_or_error}
"""
import os
import time
from pathlib import Path
# Auto-detect onex_change_control repo if ONEX_CC_REPO_PATH not already set.
# Respects any explicit override already in the environment.
# Candidate search order (first existing directory wins):
# 1. /Volumes/PRO-G40/Code/omni_home/onex_change_control (canonical mount) # local-path-ok
# 2. ~/Code/omni_home/onex_change_control (home-relative)
# 3. Any sibling named 'onex_change_control' under omni_home parents # walk up CWD
if not os.environ.get('ONEX_CC_REPO_PATH'):
_candidates = [
Path('/Volumes/PRO-G40/Code/omni_home/onex_change_control'), # local-path-ok
Path.home() / 'Code' / 'omni_home' / 'onex_change_control',
]
# Walk up from cwd looking for a sibling onex_change_control dir
for _parent in Path.cwd().parents:
_probe = _parent / 'onex_change_control'
if _probe.is_dir():
_candidates.insert(0, _probe)
break
for _candidate in _candidates:
if _candidate.is_dir():
os.environ['ONEX_CC_REPO_PATH'] = str(_candidate)
if not dry_run:
(_candidate / 'contracts').mkdir(parents=True, exist_ok=True)
print(f'[contracts] Auto-detected ONEX_CC_REPO_PATH={_candidate}')
break
else:
print(
'Warning: ONEX_CC_REPO_PATH not set and onex_change_control not found '
'at standard paths — contracts will be printed inline for manual commit.'
)
all_tickets = []
for item in results['created']:
if item.get('dry_run'):
t_id = f"DRY-{item['entry']['id']}"
title = item['entry']['title']
else:
t = item['ticket']
t_id = t.get('identifier', 'UNKNOWN')
title = t.get('title', item['entry']['title'])
all_tickets.append({'ticket_id': t_id, 'title': title})
for item in results['skipped']:
e = item['existing']
all_tickets.append({'ticket_id': e['identifier'], 'title': e['title']})
for item in results['updated']:
e = item['existing']
all_tickets.append({'ticket_id': e['identifier'], 'title': e['title']})
contract_results = []
for t in all_tickets:
if dry_run:
contract_results.append({
'ticket_id': t['ticket_id'],
'status': 'dry_run',
'is_seam': None,
'path_or_error': '[DRY RUN] Would call generate-ticket-contract'
})
continue
time.sleep(0.1) # Small delay between calls
try:
# Call generate-ticket-contract skill
result = Skill(
skill="onex:generate-ticket-contract",
args=f"{t['ticket_id']}"
)
contract_results.append({
'ticket_id': t['ticket_id'],
'status': result.get('status', 'unknown'),
'is_seam': result.get('is_seam_ticket', False),
'path_or_error': result.get('contract_path', result.get('error', ''))
})
except Exception as e:
contract_results.append({
'ticket_id': t['ticket_id'],
'status': 'error',
'is_seam': None,
'path_or_error': str(e)
})
return contract_resultsOutput a Generated Contracts table after calling the generator for all tickets:
### Generated Contracts
| Ticket | Seam? | Contract Status | Path |
|--------|-------|-----------------|------|
| OMN-XXXX | yes | valid | contracts/OMN-XXXX.yaml (auto-written) |
| OMN-YYYY | no | valid | contracts/OMN-YYYY.yaml (auto-written) |
| OMN-ZZZZ | no | error | ValidationError: missing field 'requirements' |Key rule: Call generate-ticket-contract for every ticket — no seam-keyword filtering at this
layer. The generator handles seam detection internally. Before the per-ticket loop,ONEX_CC_REPO_PATH is auto-detected from standard paths so contracts are written directly toonex_change_control/contracts/. Only if detection fails will the YAML be printed inline with a
manual commit banner.
Main Execution Flow
# Step 1: Read plan file
content = read_plan_file(args.plan_file)
# Step 2: Detect structure
structure_type, entries = detect_structure(content)
if structure_type == 'none' or not entries:
# Fail fast with clear error
print("Plans must use ## Task N: headings (or ## Phase N: for legacy plans). Use design-to-plan.")
raise SystemExit(1)
print(f"[structure_detected] type={structure_type} entries={len(entries)}")
# Step 3: Extract epic title
epic_title = extract_epic_title(content, args.epic_title)
print(f"Epic title: {epic_title}")
# Step 4: Resolve or create epic (even in dry-run mode for preview)
epic = None
try:
epic = resolve_epic(epic_title, args.team, args.no_create_epic, args.project, dry_run=args.dry_run)
except ValueError as e:
print(f"Error: {e}")
raise SystemExit(1)
# Step 5-8: Create tickets
results = create_tickets_batch(
entries=entries,
epic=epic,
team=args.team,
project=args.project,
structure_type=structure_type,
skip_existing=args.skip_existing,
dry_run=args.dry_run,
arch_violation_override=arch_violation_override # Add warning to tickets if violations were overridden
)
# Step 9: Report summary
report_summary(results, epic, structure_type, args.dry_run)Error Handling
| Error | Behavior |
|---|---|
| Plan file not found | Report path, stop |
| No valid structure | Fail fast with example |
| Epic not found + --no-create-epic | Report and stop |
| Multiple epic matches | AskUserQuestion to disambiguate |
| Architecture violation | Abort entire batch, list violations, suggest --allow-arch-violation |
| Ticket creation fails | Log error, continue with remaining |
| Dependency not resolved | Log warning, skip dependency link (forward refs resolved in second pass) |
Examples
# Basic usage - detect structure, create epic, create tickets
/plan-to-tickets ~/.claude/plans/velvety-fluttering-sonnet.md
# With project assignment
/plan-to-tickets ~/.claude/plans/my-plan.md --project "Workflow Automation"
# Preview without creating
/plan-to-tickets ~/.claude/plans/my-plan.md --dry-run
# Auto-skip existing tickets
/plan-to-tickets ~/.claude/plans/my-plan.md --skip-existing
# Use specific epic title
/plan-to-tickets ~/.claude/plans/my-plan.md --epic-title "My Epic Title"
# Fail if epic doesn't exist
/plan-to-tickets ~/.claude/plans/my-plan.md --no-create-epic --epic-title "Existing Epic"
# With repository label (enables architecture validation)
/plan-to-tickets ~/.claude/plans/my-plan.md --repo omniclaude --project "Workflow Automation"
# Override architecture validation for cross-app dependencies
/plan-to-tickets ~/.claude/plans/my-plan.md --repo omniclaude --allow-arch-violation