levy-n

data-pipeline

Data ingestion pipelines, LLM APIs, document processing, and ML data sourcing. Covers LLM API setup (OpenAI, Gemini, Ollama, LiteLLM), PDF/HTML extraction, chunking, function calling, structured output, dataset sourcing, synthetic data generation, and augmentation. Keywords: 'PDF extraction', 'chunking', 'LiteLLM', 'function calling', 'API setup', 'OCR', 'Pydantic', 'data ingestion', 'dataset', 'Kaggle', 'HuggingFace datasets', 'synthetic data', 'SMOTE', 'augmentation', 'data sourcing', 'active learning'.

levy-n 10 1 Updated 4mo ago

Resources

1
GitHub

Install

npx skillscat add levy-n/claude-useful-skills/data-pipeline

Install via the SkillsCat registry.

SKILL.md

Data Pipeline - LLM APIs & Document Processing

Data Ingestion, LLM APIs, PDF Parsing, ו-Structured Output.

Quick Start - PDF to Chunks

import pdfplumber
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Extract text from PDF
def extract_pdf_text(pdf_path):
    text = ""
    with pdfplumber.open(pdf_path) as pdf:
        for page in pdf.pages:
            text += page.extract_text() or ""
    return text

# Chunk text
text = extract_pdf_text("document.pdf")
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100)
chunks = splitter.split_text(text)

print(f"Created {len(chunks)} chunks")

Quick Start - LiteLLM (Unified API)

from litellm import completion

# Same interface for all providers!
response = completion(
    model="gpt-4",  # or "claude-3-opus-20240229", "gemini/gemini-pro"
    messages=[{"role": "user", "content": "Hello!"}]
)
print(response.choices[0].message.content)

When This Skill Activates

Use this skill when:

  • Setting up LLM API connections
  • Parsing PDFs, HTML, or documents
  • Building data ingestion pipelines
  • Creating document chunks for RAG
  • Implementing function calling / tools
  • Extracting structured data from LLMs

Core Patterns

Pattern 1: LLM API Setup

# OpenAI
from openai import OpenAI
import os

client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])

response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello"}],
    temperature=0.7
)

# Gemini
import google.generativeai as genai

genai.configure(api_key=os.environ["GOOGLE_API_KEY"])
model = genai.GenerativeModel("gemini-pro")
response = model.generate_content("Hello")

# Ollama (Local)
import ollama

response = ollama.chat(
    model="llama3",
    messages=[{"role": "user", "content": "Hello"}]
)

# LiteLLM (Unified - RECOMMENDED)
from litellm import completion

# Works with any provider!
response = completion(
    model="gpt-4",  # or "claude-3-opus", "gemini/gemini-pro", "ollama/llama3"
    messages=[{"role": "user", "content": "Hello"}]
)

Pattern 2: PDF Extraction Libraries

Library Best For Tables Speed
pdfplumber General extraction, tables Excellent Medium
PyMuPDF (fitz) Speed, complex layouts Good Fast
unstructured Mixed content, auto-detection Good Slow
pytesseract Scanned PDFs (OCR) No Slow
# pdfplumber (RECOMMENDED for tables)
import pdfplumber

with pdfplumber.open("doc.pdf") as pdf:
    for page in pdf.pages:
        text = page.extract_text()
        tables = page.extract_tables()

# PyMuPDF (FAST)
import fitz

doc = fitz.open("doc.pdf")
for page in doc:
    text = page.get_text()
    # Also supports images, annotations

# OCR for scanned PDFs
from pdf2image import convert_from_path
import pytesseract

images = convert_from_path("scanned.pdf")
text = ""
for img in images:
    text += pytesseract.image_to_string(img)

Pattern 3: Chunking Strategies

from langchain.text_splitter import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
    TokenTextSplitter
)

# Recursive (RECOMMENDED - respects paragraphs)
splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=100,
    separators=["\n\n", "\n", ". ", " ", ""]
)

# By tokens (for token-limited APIs)
splitter = TokenTextSplitter(
    chunk_size=500,      # In tokens
    chunk_overlap=50
)

# With metadata enrichment
def chunk_with_metadata(text, source_file):
    chunks = splitter.split_text(text)
    return [
        {
            "text": chunk,
            "source": source_file,
            "chunk_id": f"{source_file}_{i}",
            "char_count": len(chunk)
        }
        for i, chunk in enumerate(chunks)
    ]

Pattern 4: Function Calling (Tools)

from openai import OpenAI

client = OpenAI()

# Define tools
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get current weather for a location",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "City name"
                    },
                    "unit": {
                        "type": "string",
                        "enum": ["celsius", "fahrenheit"]
                    }
                },
                "required": ["location"]
            }
        }
    }
]

# Call with tools
response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "What's the weather in Paris?"}],
    tools=tools,
    tool_choice="auto"
)

# Check if tool was called
if response.choices[0].message.tool_calls:
    tool_call = response.choices[0].message.tool_calls[0]
    function_name = tool_call.function.name
    arguments = json.loads(tool_call.function.arguments)
    print(f"Call {function_name} with {arguments}")

Pattern 5: LangChain Tools & Agents

from langchain.agents import initialize_agent, Tool
from langchain.llms import OpenAI
from langchain.tools import tool

# Define tool with decorator
@tool
def search_database(query: str) -> str:
    """Search the company database for information."""
    # Your search logic here
    return f"Results for: {query}"

@tool
def calculate(expression: str) -> str:
    """Evaluate a mathematical expression."""
    return str(eval(expression))

# Create agent
tools = [search_database, calculate]
llm = OpenAI(temperature=0)

agent = initialize_agent(
    tools=tools,
    llm=llm,
    agent="zero-shot-react-description",
    verbose=True
)

# Run
result = agent.run("Search for sales data and calculate 15% of 1000")

Pattern 6: Structured Output with Pydantic

from pydantic import BaseModel, Field
from typing import List
from openai import OpenAI
import json

# Define schema
class ExtractedEntity(BaseModel):
    name: str = Field(description="Entity name")
    type: str = Field(description="Entity type: person, org, location")
    confidence: float = Field(ge=0, le=1)

class ExtractionResult(BaseModel):
    entities: List[ExtractedEntity]
    summary: str

# Get structured output
client = OpenAI()

response = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "system", "content": f"Extract entities. Return JSON matching this schema: {ExtractionResult.model_json_schema()}"},
        {"role": "user", "content": "Apple CEO Tim Cook announced new products in Cupertino."}
    ],
    response_format={"type": "json_object"}
)

# Parse and validate
result = ExtractionResult.model_validate_json(response.choices[0].message.content)
print(result.entities)

Pattern 7: HTML Parsing for LLM

from bs4 import BeautifulSoup
import requests

def extract_main_content(url):
    """Extract main text content from webpage."""
    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')

    # Remove script and style
    for tag in soup(['script', 'style', 'nav', 'footer', 'header']):
        tag.decompose()

    # Get text
    text = soup.get_text(separator='\n')

    # Clean up whitespace
    lines = [line.strip() for line in text.splitlines() if line.strip()]
    return '\n'.join(lines)

# Use with LLM
text = extract_main_content("https://example.com/article")
response = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "system", "content": "Summarize this article."},
        {"role": "user", "content": text}
    ]
)

Pattern 8: Multi-Provider Comparison

from litellm import completion
import time

providers = [
    ("gpt-4", "openai"),
    ("claude-3-opus-20240229", "anthropic"),
    ("gemini/gemini-pro", "google"),
]

prompt = "Explain machine learning in one sentence."

for model, provider in providers:
    start = time.time()
    try:
        response = completion(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )
        elapsed = time.time() - start
        print(f"\n{provider} ({elapsed:.2f}s):")
        print(response.choices[0].message.content)
    except Exception as e:
        print(f"{provider}: Error - {e}")

Reference Navigation

For detailed content, see:

  • LLM API Setup: reference/llm_api_setup.md - OpenAI, Gemini, Ollama, LiteLLM
  • PDF Extraction: reference/pdf_extraction.md - pdfplumber, PyMuPDF, OCR
  • Chunking Strategies: reference/chunking_strategies.md - Fixed, Overlapping, Semantic
  • Function Calling: reference/function_calling.md - Tools, JSON Schema, Agents
  • Structured Output: reference/structured_output.md - Pydantic, JSON validation
  • Data Foundations: reference/data_foundations.md - NumPy, Pandas, SQL, EDA
  • Data Sourcing & Augmentation: reference/data_sourcing.md - Datasets, synthetic data, augmentation, scraping

Common Mistakes to Avoid

1. Hardcoding API Keys

# WRONG: Key in code
client = OpenAI(api_key="sk-...")  # Never commit this!

# CORRECT: Use environment variables
import os
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])

# Or .env file with python-dotenv
from dotenv import load_dotenv
load_dotenv()

2. Not Handling PDF Extraction Errors

# WRONG: Assumes all pages have text
text = page.extract_text()  # May return None!

# CORRECT: Handle None
text = page.extract_text() or ""

3. Ignoring Token Limits

# WRONG: Send entire document
response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": huge_document}]  # May exceed limit!
)

# CORRECT: Chunk and process
chunks = splitter.split_text(huge_document)
results = []
for chunk in chunks:
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": chunk}]
    )
    results.append(response.choices[0].message.content)

4. No Error Handling for API Calls

# WRONG: Assumes API always succeeds
response = completion(model="gpt-4", messages=[...])

# CORRECT: Handle errors and retries
import time
from litellm import completion
from litellm.exceptions import RateLimitError

def call_with_retry(messages, max_retries=3):
    for attempt in range(max_retries):
        try:
            return completion(model="gpt-4", messages=messages)
        except RateLimitError:
            wait = 2 ** attempt
            print(f"Rate limited, waiting {wait}s...")
            time.sleep(wait)
    raise Exception("Max retries exceeded")

5. Chunking Without Overlap

# WRONG: Context lost at chunk boundaries
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=0)

# CORRECT: Add overlap
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=100)

Teaching Mode

When explaining data pipelines:

PDF Extraction Visual

PDF Document
    │
    ├── Text Pages ──────► pdfplumber/PyMuPDF
    │                           │
    ├── Tables ──────────► pdfplumber.extract_tables()
    │                           │
    ├── Images ──────────► PyMuPDF + OCR
    │                           │
    └── Scanned Pages ───► pdf2image + pytesseract
                                │
                                ▼
                          Raw Text
                                │
                                ▼
                       Text Splitter
                                │
                                ▼
                       Chunks + Metadata
                                │
                                ▼
                       Vector Store / RAG

Function Calling Flow

User: "What's the weather in Paris?"
            │
            ▼
    ┌───────────────┐
    │   LLM thinks  │
    │ "Need weather │
    │   function"   │
    └───────────────┘
            │
            ▼
    Tool Call: get_weather(location="Paris")
            │
            ▼
    ┌───────────────┐
    │ Your Code     │
    │ Calls Weather │
    │     API       │
    └───────────────┘
            │
            ▼
    Result: {"temp": 22, "condition": "sunny"}
            │
            ▼
    ┌───────────────┐
    │ LLM formats   │
    │   response    │
    └───────────────┘
            │
            ▼
    "The weather in Paris is 22°C and sunny."