Module 7.3

Analytics Automation

Learn to automate repetitive analytics tasks with Python. Build data pipelines, schedule reports, send automated emails, and collect data from the web.

45 min
Intermediate
Hands-on
What You'll Learn
  • Building automated data pipelines
  • Scheduling with cron & Task Scheduler
  • Sending automated email reports
  • Web scraping with BeautifulSoup
  • Error handling & logging
Contents
01

Introduction to Automation

Analysts spend up to 80% of their time on repetitive tasks: fetching data, cleaning it, generating reports, and sending updates. Automation eliminates this drudgery, letting you focus on insights rather than mechanics. Python's rich ecosystem makes it the ideal language for building automated analytics workflows.

Key Concept

What is Analytics Automation?

Analytics automation is the process of using scripts and tools to perform data tasks without manual intervention. This includes extracting data from sources, transforming it, generating visualizations, and distributing reports - all triggered automatically on a schedule or event.

Business Impact: Automated pipelines reduce errors, ensure consistency, and free analysts to focus on strategic analysis rather than data wrangling.

Why Automate?

Manual processes are error-prone and time-consuming. A single typo in a formula or a forgotten step can cascade into major issues. Automation provides repeatability, auditability, and scalability - run the same pipeline on 100 files as easily as on one.

Time Savings

Turn hours of manual work into seconds of script execution

Consistency

Same process every time - no forgotten steps or human errors

Scalability

Process 10 files or 10,000 with the same script

Auditability

Code serves as documentation - track exactly what happened

Automation Building Blocks

A typical analytics automation workflow combines several components: data extraction, transformation, output generation, scheduling, and notification. Python provides libraries for each:

TaskPython LibraryPurpose
Data extractionpandas, requests, sqlalchemyRead from files, APIs, databases
Transformationpandas, numpyClean, filter, aggregate data
Visualizationmatplotlib, seaborn, plotlyGenerate charts and dashboards
Reportingopenpyxl, fpdf, jinja2Create Excel, PDF, HTML reports
Emailsmtplib, emailSend automated notifications
Web scrapingbeautifulsoup4, requestsCollect data from websites
Schedulingschedule, APScheduler, cronRun scripts at set times

A Simple Automation Example

Here's a minimal script that reads sales data, calculates totals, and saves a summary. This is the foundation of any automated pipeline - input, process, output.

import pandas as pd
from datetime import datetime

# 1. Extract - Read data
df = pd.read_csv("daily_sales.csv")

# 2. Transform - Calculate metrics
summary = df.groupby("product")["revenue"].agg(["sum", "mean", "count"])
summary.columns = ["Total Revenue", "Avg Sale", "Transactions"]

# 3. Load - Save output with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
output_file = f"sales_summary_{timestamp}.csv"
summary.to_csv(output_file)

print(f"Report saved: {output_file}")
ETL Pattern: This Extract-Transform-Load (ETL) pattern is the backbone of data automation. Master it, and you can build pipelines of any complexity.

Practice Questions

Task: Modify the script to only include sales from the last 7 days.

df = pd.read_csv("daily_sales.csv")
df["date"] = pd.to_datetime(df["date"])
Show Solution
from datetime import datetime, timedelta

df = pd.read_csv("daily_sales.csv")
df["date"] = pd.to_datetime(df["date"])

# Filter last 7 days
cutoff = datetime.now() - timedelta(days=7)
df = df[df["date"] >= cutoff]

summary = df.groupby("product")["revenue"].sum()
print(summary)

Task: Add error handling so the script logs a warning if the input file is missing instead of crashing.

Show Solution
import pandas as pd
import logging
import os

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

input_file = "daily_sales.csv"

if not os.path.exists(input_file):
    logger.warning(f"File not found: {input_file}")
else:
    df = pd.read_csv(input_file)
    summary = df.groupby("product")["revenue"].sum()
    logger.info(f"Processed {len(df)} records")
    print(summary)
02

Automating Data Pipelines

A data pipeline is a series of steps that move data from sources to destinations, transforming it along the way. Well-designed pipelines are modular, testable, and resilient to failures. This section shows how to structure Python scripts for maintainable automation.

Framework

Pipeline Design Principles

  • Idempotent: Running the pipeline twice produces the same result - no duplicates
  • Modular: Each step is a function that can be tested independently
  • Logged: Every action is recorded for debugging and auditing
  • Configurable: Paths, credentials, and parameters live in config files, not code

Structuring a Pipeline Script

Organize your pipeline into clear functions for extraction, transformation, and loading. Use a main function to orchestrate the flow. This structure makes debugging and testing straightforward.

import pandas as pd
import logging
from pathlib import Path
from datetime import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("pipeline.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def extract(source_path: str) -> pd.DataFrame:
    """Extract data from source file."""
    logger.info(f"Extracting from {source_path}")
    df = pd.read_csv(source_path)
    logger.info(f"Extracted {len(df)} rows")
    return df

def transform(df: pd.DataFrame) -> pd.DataFrame:
    """Clean and transform data."""
    logger.info("Starting transformation")
    
    # Remove duplicates
    initial_rows = len(df)
    df = df.drop_duplicates()
    logger.info(f"Removed {initial_rows - len(df)} duplicates")
    
    # Handle missing values
    df = df.fillna({"revenue": 0, "quantity": 0})
    
    # Add calculated columns
    df["total"] = df["revenue"] * df["quantity"]
    
    return df

def load(df: pd.DataFrame, output_path: str) -> None:
    """Save transformed data to destination."""
    logger.info(f"Loading to {output_path}")
    df.to_csv(output_path, index=False)
    logger.info(f"Saved {len(df)} rows")

def main():
    """Orchestrate the ETL pipeline."""
    try:
        # Configuration
        source = "raw_data/sales_2024.csv"
        timestamp = datetime.now().strftime("%Y%m%d")
        output = f"processed/sales_clean_{timestamp}.csv"
        
        # Run pipeline
        data = extract(source)
        data = transform(data)
        load(data, output)
        
        logger.info("Pipeline completed successfully")
        
    except Exception as e:
        logger.error(f"Pipeline failed: {e}")
        raise

if __name__ == "__main__":
    main()

Configuration Files

Never hardcode file paths or credentials. Use YAML or JSON config files that can be changed without modifying code. This also allows different configurations for development vs production.

# config.yaml
# ---
# source_path: "data/raw/sales.csv"
# output_dir: "data/processed"
# database:
#   host: "localhost"
#   port: 5432
#   name: "analytics"

import yaml

def load_config(config_path: str = "config.yaml") -> dict:
    """Load configuration from YAML file."""
    with open(config_path, "r") as f:
        return yaml.safe_load(f)

# Usage
config = load_config()
source = config["source_path"]
output_dir = config["output_dir"]

Processing Multiple Files

Real pipelines often process batches of files. Use pathlib to iterate over directories and process each file with the same logic.

from pathlib import Path
import pandas as pd

def process_all_files(input_dir: str, output_dir: str) -> None:
    """Process all CSV files in a directory."""
    input_path = Path(input_dir)
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)
    
    # Find all CSV files
    csv_files = list(input_path.glob("*.csv"))
    print(f"Found {len(csv_files)} files to process")
    
    for file in csv_files:
        print(f"Processing: {file.name}")
        
        # Read, transform, save
        df = pd.read_csv(file)
        df = df.drop_duplicates()
        df["processed_date"] = pd.Timestamp.now()
        
        # Save to output directory
        output_file = output_path / f"clean_{file.name}"
        df.to_csv(output_file, index=False)
    
    print("All files processed!")

# Run
process_all_files("raw_data/", "processed_data/")
Pro Tip: Always process files into a separate output directory. Never overwrite source files - you may need them for debugging or reprocessing.

Database Integration

For production pipelines, data often comes from or goes to databases. SQLAlchemy provides a clean interface for database operations.

import pandas as pd
from sqlalchemy import create_engine

# Create database connection
engine = create_engine("postgresql://user:pass@localhost:5432/analytics")

# Extract from database
query = """
    SELECT product_id, SUM(revenue) as total_revenue
    FROM sales
    WHERE sale_date >= '2024-01-01'
    GROUP BY product_id
"""
df = pd.read_sql(query, engine)

# Transform
df["revenue_category"] = pd.cut(
    df["total_revenue"], 
    bins=[0, 1000, 10000, float("inf")],
    labels=["Low", "Medium", "High"]
)

# Load back to database
df.to_sql("product_summary", engine, if_exists="replace", index=False)

Practice Questions

Task: Modify the transform function to raise an error if the DataFrame has fewer than 10 rows after cleaning.

Show Solution
def transform(df: pd.DataFrame) -> pd.DataFrame:
    df = df.drop_duplicates()
    df = df.dropna()
    
    if len(df) < 10:
        raise ValueError(f"Too few rows after cleaning: {len(df)}")
    
    return df

Task: Create a function that returns a dictionary with pipeline statistics: rows processed, duplicates removed, and processing time.

Show Solution
import time

def run_pipeline_with_stats(source: str) -> dict:
    start_time = time.time()
    
    df = pd.read_csv(source)
    initial_rows = len(df)
    
    df = df.drop_duplicates()
    final_rows = len(df)
    
    elapsed = time.time() - start_time
    
    return {
        "source": source,
        "initial_rows": initial_rows,
        "final_rows": final_rows,
        "duplicates_removed": initial_rows - final_rows,
        "processing_time_sec": round(elapsed, 2)
    }

stats = run_pipeline_with_stats("sales.csv")
print(stats)

Task: Modify the pipeline to only process files modified since the last run. Store the last run timestamp in a JSON file.

Show Solution
import json
import os
from datetime import datetime
from pathlib import Path

STATE_FILE = "pipeline_state.json"

def get_last_run() -> datetime:
    if os.path.exists(STATE_FILE):
        with open(STATE_FILE, "r") as f:
            state = json.load(f)
            return datetime.fromisoformat(state["last_run"])
    return datetime.min

def save_run_time():
    with open(STATE_FILE, "w") as f:
        json.dump({"last_run": datetime.now().isoformat()}, f)

def process_new_files(input_dir: str):
    last_run = get_last_run()
    
    for file in Path(input_dir).glob("*.csv"):
        file_mtime = datetime.fromtimestamp(file.stat().st_mtime)
        
        if file_mtime > last_run:
            print(f"Processing new file: {file.name}")
            df = pd.read_csv(file)
            # ... process df
        else:
            print(f"Skipping unchanged: {file.name}")
    
    save_run_time()

process_new_files("data/")
03

Scheduling Tasks

A pipeline is only useful if it runs automatically. Task schedulers trigger your scripts at specified times - daily reports, hourly data refreshes, or weekly summaries. This section covers both OS-level schedulers (cron, Task Scheduler) and Python-based scheduling.

Concept

Scheduling Options

  • Cron (Linux/Mac): Built-in OS scheduler with powerful time expressions
  • Task Scheduler (Windows): GUI-based scheduler for Windows systems
  • Python schedule library: Simple in-process scheduling for scripts
  • APScheduler: Advanced Python scheduler with persistence

Cron Jobs (Linux/Mac)

Cron is the standard task scheduler on Unix systems. Each cron job has five time fields followed by the command to run. Use crontab -e to edit your schedule.

# Cron time format:
# MIN HOUR DAY MONTH WEEKDAY command

# Run daily at 6:00 AM
0 6 * * * /usr/bin/python3 /home/user/scripts/daily_report.py

# Run every Monday at 9:00 AM
0 9 * * 1 /usr/bin/python3 /home/user/scripts/weekly_summary.py

# Run every hour
0 * * * * /usr/bin/python3 /home/user/scripts/hourly_check.py

# Run every 15 minutes
*/15 * * * * /usr/bin/python3 /home/user/scripts/data_sync.py

# Run on the 1st of every month at midnight
0 0 1 * * /usr/bin/python3 /home/user/scripts/monthly_archive.py
FieldValuesSpecial Characters
Minute0-59* (any), */n (every n), , (list)
Hour0-23- (range), e.g., 9-17
Day of Month1-31Combine: 0 9,17 * * * = 9AM and 5PM
Month1-12
Day of Week0-6 (Sun=0)
Pro Tip: Always use absolute paths in cron jobs. The cron environment doesn't inherit your shell's PATH variable. Redirect output to a log file: ... >> /var/log/myscript.log 2>&1

Windows Task Scheduler

Windows Task Scheduler provides a GUI for scheduling tasks. You can also create tasks via PowerShell for automation.

# PowerShell: Create a scheduled task to run daily at 8 AM
$action = New-ScheduledTaskAction -Execute "python" -Argument "C:\Scripts\daily_report.py"
$trigger = New-ScheduledTaskTrigger -Daily -At 8:00AM
$settings = New-ScheduledTaskSettingsSet -StartWhenAvailable

Register-ScheduledTask -TaskName "DailyAnalyticsReport" `
    -Action $action `
    -Trigger $trigger `
    -Settings $settings `
    -Description "Generate daily analytics report"

# View existing tasks
Get-ScheduledTask | Where-Object {$_.TaskName -like "*Analytics*"}

# Run a task manually
Start-ScheduledTask -TaskName "DailyAnalyticsReport"

Python schedule Library

For simpler needs, the schedule library lets you define schedules directly in Python. The script must stay running for scheduled jobs to execute.

# pip install schedule
import schedule
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def generate_report():
    logger.info("Generating daily report...")
    # Your pipeline code here
    logger.info("Report complete!")

def sync_data():
    logger.info("Syncing data...")
    # Data sync logic here

# Schedule jobs
schedule.every().day.at("06:00").do(generate_report)
schedule.every().monday.at("09:00").do(generate_report)
schedule.every(15).minutes.do(sync_data)
schedule.every().hour.do(sync_data)

# Run the scheduler
logger.info("Scheduler started. Press Ctrl+C to exit.")
while True:
    schedule.run_pending()
    time.sleep(60)  # Check every minute

APScheduler for Production

APScheduler is more robust for production use. It supports multiple backends (memory, database, Redis) and can persist schedules across restarts.

# pip install apscheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime

scheduler = BlockingScheduler()

def morning_report():
    print(f"[{datetime.now()}] Running morning report")

def hourly_check():
    print(f"[{datetime.now()}] Running hourly check")

# Add jobs with cron-like syntax
scheduler.add_job(morning_report, CronTrigger(hour=6, minute=0))
scheduler.add_job(hourly_check, CronTrigger(minute=0))  # Every hour

# Add job with interval
scheduler.add_job(
    hourly_check,
    "interval",
    minutes=30,
    id="data_sync"
)

print("Scheduler starting...")
scheduler.start()
Important: Python schedulers (schedule, APScheduler) require the script to stay running. For reliability, consider running them as system services or using OS-level schedulers.

Practice Questions

Task: Write a cron expression that runs a script every weekday (Mon-Fri) at 5:30 PM.

Show Solution
# MIN HOUR DAY MONTH WEEKDAY command
30 17 * * 1-5 /usr/bin/python3 /path/to/script.py

# Explanation:
# 30 = minute 30
# 17 = 5 PM (24-hour format)
# * = any day of month
# * = any month
# 1-5 = Monday through Friday

Task: Modify a scheduled job to retry up to 3 times if it fails, with a 5-minute delay between retries.

Show Solution
import time
import schedule
import logging

logger = logging.getLogger(__name__)

def run_with_retry(func, max_retries=3, delay=300):
    """Run a function with retry logic."""
    for attempt in range(max_retries):
        try:
            func()
            return  # Success
        except Exception as e:
            logger.error(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                logger.info(f"Retrying in {delay} seconds...")
                time.sleep(delay)
    logger.error("All retries exhausted")

def my_job():
    # Simulate occasional failure
    import random
    if random.random() < 0.5:
        raise Exception("Random failure")
    print("Job succeeded!")

# Schedule with retry wrapper
schedule.every().day.at("06:00").do(
    lambda: run_with_retry(my_job)
)
04

Automated Email Reports

Generating reports is only half the job - they need to reach stakeholders. Python's smtplib and email modules let you send automated emails with attachments, HTML formatting, and embedded charts.

Protocol

SMTP Email Sending

SMTP (Simple Mail Transfer Protocol) is the standard for sending emails. Python's smtplib connects to an SMTP server (Gmail, Outlook, corporate server) to deliver messages. For Gmail, you'll need an App Password if 2FA is enabled.

Sending a Simple Email

Start with a basic text email. Use environment variables or config files for credentials - never hardcode passwords in scripts.

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import os

def send_email(subject: str, body: str, to_email: str):
    """Send a simple text email."""
    # Load credentials from environment variables
    smtp_server = os.getenv("SMTP_SERVER", "smtp.gmail.com")
    smtp_port = int(os.getenv("SMTP_PORT", 587))
    sender_email = os.getenv("SENDER_EMAIL")
    sender_password = os.getenv("SENDER_PASSWORD")
    
    # Create message
    msg = MIMEMultipart()
    msg["From"] = sender_email
    msg["To"] = to_email
    msg["Subject"] = subject
    msg.attach(MIMEText(body, "plain"))
    
    # Send email
    with smtplib.SMTP(smtp_server, smtp_port) as server:
        server.starttls()  # Enable encryption
        server.login(sender_email, sender_password)
        server.send_message(msg)
    
    print(f"Email sent to {to_email}")

# Usage
send_email(
    subject="Daily Sales Report",
    body="Sales totaled $45,230 today. See attached for details.",
    to_email="manager@company.com"
)

HTML Emails with Tables

HTML emails look more professional and can include formatted tables. Convert pandas DataFrames directly to HTML tables.

import pandas as pd
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

def send_html_report(df: pd.DataFrame, to_email: str):
    """Send an HTML email with a data table."""
    # Convert DataFrame to HTML table
    table_html = df.to_html(index=False, classes="data-table")
    
    html_body = f"""
    <html>
    <head>
        <style>
            body {{ font-family: Arial, sans-serif; }}
            .data-table {{ border-collapse: collapse; width: 100%; }}
            .data-table th {{ background-color: #6366f1; color: white; padding: 10px; }}
            .data-table td {{ border: 1px solid #ddd; padding: 8px; }}
            .data-table tr:nth-child(even) {{ background-color: #f9f9f9; }}
            h2 {{ color: #333; }}
        </style>
    </head>
    <body>
        <h2>Daily Sales Report</h2>
        <p>Here is your automated daily summary:</p>
        {table_html}
        <p>Generated automatically by the Analytics Pipeline.</p>
    </body>
    </html>
    """
    
    msg = MIMEMultipart("alternative")
    msg["Subject"] = "Daily Sales Report"
    msg["From"] = sender_email
    msg["To"] = to_email
    
    msg.attach(MIMEText(html_body, "html"))
    
    # Send via SMTP...

# Example data
sales_df = pd.DataFrame({
    "Product": ["Widget A", "Widget B", "Widget C"],
    "Units": [150, 230, 180],
    "Revenue": ["$4,500", "$6,900", "$5,400"]
})

send_html_report(sales_df, "team@company.com")

Email with Attachments

Attach CSV, Excel, or PDF reports to your automated emails. Use MIMEBase for binary attachments.

from email.mime.base import MIMEBase
from email import encoders
import os

def send_email_with_attachment(to_email: str, subject: str, body: str, attachment_path: str):
    """Send email with a file attachment."""
    msg = MIMEMultipart()
    msg["From"] = os.getenv("SENDER_EMAIL")
    msg["To"] = to_email
    msg["Subject"] = subject
    msg.attach(MIMEText(body, "plain"))
    
    # Attach file
    with open(attachment_path, "rb") as f:
        attachment = MIMEBase("application", "octet-stream")
        attachment.set_payload(f.read())
    
    encoders.encode_base64(attachment)
    filename = os.path.basename(attachment_path)
    attachment.add_header("Content-Disposition", f"attachment; filename={filename}")
    msg.attach(attachment)
    
    # Send via SMTP (same as before)
    with smtplib.SMTP("smtp.gmail.com", 587) as server:
        server.starttls()
        server.login(os.getenv("SENDER_EMAIL"), os.getenv("SENDER_PASSWORD"))
        server.send_message(msg)
    
    print(f"Email with attachment sent to {to_email}")

# Usage
send_email_with_attachment(
    to_email="manager@company.com",
    subject="Weekly Report - Week 42",
    body="Please find the weekly analytics report attached.",
    attachment_path="reports/weekly_report_w42.xlsx"
)
Security: Never commit email credentials to version control. Use environment variables, secrets managers, or encrypted config files.

Practice Questions

Task: Modify the send_email function to accept a list of recipients.

Show Solution
def send_email(subject: str, body: str, to_emails: list):
    """Send email to multiple recipients."""
    msg = MIMEMultipart()
    msg["From"] = sender_email
    msg["To"] = ", ".join(to_emails)  # Comma-separated
    msg["Subject"] = subject
    msg.attach(MIMEText(body, "plain"))
    
    with smtplib.SMTP("smtp.gmail.com", 587) as server:
        server.starttls()
        server.login(sender_email, sender_password)
        server.sendmail(sender_email, to_emails, msg.as_string())

# Usage
send_email(
    "Daily Report",
    "Report attached.",
    ["alice@co.com", "bob@co.com", "charlie@co.com"]
)

Task: Create a function that sends an alert email only if sales drop below a threshold.

Show Solution
def check_and_alert(df: pd.DataFrame, threshold: float = 1000):
    """Send alert if daily sales fall below threshold."""
    today_sales = df[df["date"] == pd.Timestamp.today().date()]["revenue"].sum()
    
    if today_sales < threshold:
        send_email(
            subject="⚠️ LOW SALES ALERT",
            body=f"Today's sales (${today_sales:,.2f}) are below the ${threshold:,.2f} threshold.",
            to_email="alerts@company.com"
        )
        print("Alert sent!")
    else:
        print(f"Sales OK: ${today_sales:,.2f}")

# Run as part of daily pipeline
sales_df = pd.read_csv("daily_sales.csv")
check_and_alert(sales_df, threshold=5000)
05

Web Scraping for Data Collection

Not all data comes from databases or APIs. Web scraping extracts information from websites - competitor prices, public datasets, news articles, or any publicly available content. Python's BeautifulSoup and requests libraries make this straightforward.

Important

Web Scraping Ethics & Legality

  • Check robots.txt: Respect the site's scraping policy (e.g., example.com/robots.txt)
  • Terms of Service: Some sites explicitly prohibit scraping
  • Rate limiting: Don't overwhelm servers - add delays between requests
  • Public data only: Never scrape private or login-protected content without permission

Basic Web Scraping

The requests library fetches web pages, and BeautifulSoup parses HTML to extract data. Install them with pip install requests beautifulsoup4.

# pip install requests beautifulsoup4
import requests
from bs4 import BeautifulSoup

def scrape_page(url: str) -> BeautifulSoup:
    """Fetch a web page and return parsed HTML."""
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
    }
    
    response = requests.get(url, headers=headers)
    response.raise_for_status()  # Raise error for bad status codes
    
    return BeautifulSoup(response.text, "html.parser")

# Example: Scrape quotes from a demo site
soup = scrape_page("https://quotes.toscrape.com/")

# Find all quote elements
quotes = soup.find_all("div", class_="quote")

for quote in quotes[:3]:
    text = quote.find("span", class_="text").get_text()
    author = quote.find("small", class_="author").get_text()
    print(f'"{text}" - {author}')

Extracting Tables

Tables are common targets for scraping. BeautifulSoup can parse them, but pandas has a shortcut: pd.read_html() extracts all tables from a page into DataFrames.

import pandas as pd

# Automatically extract all tables from a webpage
url = "https://en.wikipedia.org/wiki/List_of_countries_by_GDP_(nominal)"
tables = pd.read_html(url)

print(f"Found {len(tables)} tables")

# Get the main data table (usually the largest)
gdp_df = tables[0]  # Adjust index based on page structure
print(gdp_df.head())

Scraping Multiple Pages

Many websites paginate content. Loop through pages and combine results. Always add delays to avoid overwhelming the server.

import requests
from bs4 import BeautifulSoup
import pandas as pd
import time

def scrape_all_pages(base_url: str, max_pages: int = 5) -> list:
    """Scrape multiple pages of results."""
    all_data = []
    
    for page in range(1, max_pages + 1):
        url = f"{base_url}?page={page}"
        print(f"Scraping page {page}...")
        
        response = requests.get(url)
        soup = BeautifulSoup(response.text, "html.parser")
        
        # Extract items from this page
        items = soup.find_all("div", class_="item")
        
        for item in items:
            all_data.append({
                "title": item.find("h2").get_text(strip=True),
                "price": item.find("span", class_="price").get_text(strip=True),
                "page": page
            })
        
        # Be polite - wait between requests
        time.sleep(1)
    
    return all_data

# Usage
data = scrape_all_pages("https://example-shop.com/products", max_pages=10)
df = pd.DataFrame(data)
df.to_csv("scraped_products.csv", index=False)

Handling Dynamic Content

Some websites load content via JavaScript after the initial page load. For these, you need selenium to control a real browser.

# pip install selenium webdriver-manager
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

def scrape_dynamic_page(url: str) -> list:
    """Scrape a JavaScript-rendered page."""
    # Setup Chrome in headless mode
    options = webdriver.ChromeOptions()
    options.add_argument("--headless")
    
    driver = webdriver.Chrome(
        service=Service(ChromeDriverManager().install()),
        options=options
    )
    
    try:
        driver.get(url)
        
        # Wait for content to load
        WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.CLASS_NAME, "product-card"))
        )
        
        # Extract data
        products = driver.find_elements(By.CLASS_NAME, "product-card")
        data = []
        
        for product in products:
            data.append({
                "name": product.find_element(By.TAG_NAME, "h3").text,
                "price": product.find_element(By.CLASS_NAME, "price").text
            })
        
        return data
    
    finally:
        driver.quit()

# Usage
products = scrape_dynamic_page("https://dynamic-site.com/products")
Pro Tip: Always check if a site offers an API before scraping. APIs are more reliable, faster, and don't require parsing HTML.

Practice Questions

Task: Write a function that returns all hyperlinks (href attributes) from a webpage.

Show Solution
def get_all_links(url: str) -> list:
    """Extract all links from a webpage."""
    response = requests.get(url)
    soup = BeautifulSoup(response.text, "html.parser")
    
    links = []
    for a_tag in soup.find_all("a", href=True):
        links.append(a_tag["href"])
    
    return links

# Usage
links = get_all_links("https://example.com")
print(f"Found {len(links)} links")
for link in links[:5]:
    print(link)

Task: Modify the scraper to retry failed requests up to 3 times with exponential backoff.

Show Solution
import time

def fetch_with_retry(url: str, max_retries: int = 3) -> str:
    """Fetch URL with exponential backoff retry."""
    for attempt in range(max_retries):
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            return response.text
        except requests.RequestException as e:
            wait_time = 2 ** attempt  # 1, 2, 4 seconds
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                print(f"Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
    
    raise Exception(f"Failed to fetch {url} after {max_retries} attempts")

# Usage
html = fetch_with_retry("https://example.com")
soup = BeautifulSoup(html, "html.parser")

Task: Create a scraper that checks a product's price daily and sends an email alert if it drops below a target price.

Show Solution
import requests
from bs4 import BeautifulSoup
import re

def check_price(url: str, target_price: float):
    """Check product price and alert if below target."""
    response = requests.get(url, headers={"User-Agent": "Mozilla/5.0"})
    soup = BeautifulSoup(response.text, "html.parser")
    
    # Find price (adjust selector for specific site)
    price_element = soup.find("span", class_="price")
    price_text = price_element.get_text()
    
    # Extract numeric price
    price = float(re.sub(r"[^\d.]", "", price_text))
    
    print(f"Current price: ${price:.2f}")
    
    if price <= target_price:
        send_email(
            subject=f"Price Alert: ${price:.2f}!",
            body=f"The product is now ${price:.2f} (target: ${target_price:.2f})\n{url}",
            to_email="alerts@company.com"
        )
        print("Alert sent!")
    else:
        print(f"Price still above target (${target_price:.2f})")

# Schedule to run daily
# check_price("https://shop.com/product/123", target_price=49.99)

Key Takeaways

ETL Pattern

Structure pipelines as Extract-Transform-Load with separate functions for testability and maintainability.

Scheduling Options

Use cron (Linux) or Task Scheduler (Windows) for OS-level scheduling, or Python's schedule/APScheduler for in-process jobs.

Automated Emails

Python's smtplib sends emails with HTML formatting, tables, and attachments for automated report distribution.

Web Scraping

BeautifulSoup parses static HTML; Selenium handles JavaScript-rendered pages. Always respect robots.txt and rate limits.

Error Handling

Use try/except, logging, and retry logic to make pipelines resilient. Never let a single failure crash the entire workflow.

Security First

Store credentials in environment variables or secrets managers. Never hardcode passwords or commit them to version control.

Knowledge Check

Test your understanding of analytics automation:

Question 1 of 6

What does ETL stand for in data pipeline terminology?

Question 2 of 6

Which cron expression runs a job every weekday at 9:00 AM?

Question 3 of 6

Which Python library is best for scraping JavaScript-rendered web pages?

Question 4 of 6

What is the recommended way to store email credentials in an automation script?

Question 5 of 6

What should you always check before scraping a website?

Question 6 of 6

Why should pipeline functions be idempotent?

Answer all questions to check your score