Introduction to Automation
Analysts spend up to 80% of their time on repetitive tasks: fetching data, cleaning it, generating reports, and sending updates. Automation eliminates this drudgery, letting you focus on insights rather than mechanics. Python's rich ecosystem makes it the ideal language for building automated analytics workflows.
What is Analytics Automation?
Analytics automation is the process of using scripts and tools to perform data tasks without manual intervention. This includes extracting data from sources, transforming it, generating visualizations, and distributing reports - all triggered automatically on a schedule or event.
Business Impact: Automated pipelines reduce errors, ensure consistency, and free analysts to focus on strategic analysis rather than data wrangling.
Why Automate?
Manual processes are error-prone and time-consuming. A single typo in a formula or a forgotten step can cascade into major issues. Automation provides repeatability, auditability, and scalability - run the same pipeline on 100 files as easily as on one.
Time Savings
Turn hours of manual work into seconds of script execution
Consistency
Same process every time - no forgotten steps or human errors
Scalability
Process 10 files or 10,000 with the same script
Auditability
Code serves as documentation - track exactly what happened
Automation Building Blocks
A typical analytics automation workflow combines several components: data extraction, transformation, output generation, scheduling, and notification. Python provides libraries for each:
| Task | Python Library | Purpose |
|---|---|---|
| Data extraction | pandas, requests, sqlalchemy | Read from files, APIs, databases |
| Transformation | pandas, numpy | Clean, filter, aggregate data |
| Visualization | matplotlib, seaborn, plotly | Generate charts and dashboards |
| Reporting | openpyxl, fpdf, jinja2 | Create Excel, PDF, HTML reports |
smtplib, email | Send automated notifications | |
| Web scraping | beautifulsoup4, requests | Collect data from websites |
| Scheduling | schedule, APScheduler, cron | Run scripts at set times |
A Simple Automation Example
Here's a minimal script that reads sales data, calculates totals, and saves a summary. This is the foundation of any automated pipeline - input, process, output.
import pandas as pd
from datetime import datetime
# 1. Extract - Read data
df = pd.read_csv("daily_sales.csv")
# 2. Transform - Calculate metrics
summary = df.groupby("product")["revenue"].agg(["sum", "mean", "count"])
summary.columns = ["Total Revenue", "Avg Sale", "Transactions"]
# 3. Load - Save output with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
output_file = f"sales_summary_{timestamp}.csv"
summary.to_csv(output_file)
print(f"Report saved: {output_file}")
Practice Questions
Task: Modify the script to only include sales from the last 7 days.
df = pd.read_csv("daily_sales.csv")
df["date"] = pd.to_datetime(df["date"])
Show Solution
from datetime import datetime, timedelta
df = pd.read_csv("daily_sales.csv")
df["date"] = pd.to_datetime(df["date"])
# Filter last 7 days
cutoff = datetime.now() - timedelta(days=7)
df = df[df["date"] >= cutoff]
summary = df.groupby("product")["revenue"].sum()
print(summary)
Task: Add error handling so the script logs a warning if the input file is missing instead of crashing.
Show Solution
import pandas as pd
import logging
import os
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
input_file = "daily_sales.csv"
if not os.path.exists(input_file):
logger.warning(f"File not found: {input_file}")
else:
df = pd.read_csv(input_file)
summary = df.groupby("product")["revenue"].sum()
logger.info(f"Processed {len(df)} records")
print(summary)
Automating Data Pipelines
A data pipeline is a series of steps that move data from sources to destinations, transforming it along the way. Well-designed pipelines are modular, testable, and resilient to failures. This section shows how to structure Python scripts for maintainable automation.
Pipeline Design Principles
- Idempotent: Running the pipeline twice produces the same result - no duplicates
- Modular: Each step is a function that can be tested independently
- Logged: Every action is recorded for debugging and auditing
- Configurable: Paths, credentials, and parameters live in config files, not code
Structuring a Pipeline Script
Organize your pipeline into clear functions for extraction, transformation, and loading. Use a main function to orchestrate the flow. This structure makes debugging and testing straightforward.
import pandas as pd
import logging
from pathlib import Path
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("pipeline.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def extract(source_path: str) -> pd.DataFrame:
"""Extract data from source file."""
logger.info(f"Extracting from {source_path}")
df = pd.read_csv(source_path)
logger.info(f"Extracted {len(df)} rows")
return df
def transform(df: pd.DataFrame) -> pd.DataFrame:
"""Clean and transform data."""
logger.info("Starting transformation")
# Remove duplicates
initial_rows = len(df)
df = df.drop_duplicates()
logger.info(f"Removed {initial_rows - len(df)} duplicates")
# Handle missing values
df = df.fillna({"revenue": 0, "quantity": 0})
# Add calculated columns
df["total"] = df["revenue"] * df["quantity"]
return df
def load(df: pd.DataFrame, output_path: str) -> None:
"""Save transformed data to destination."""
logger.info(f"Loading to {output_path}")
df.to_csv(output_path, index=False)
logger.info(f"Saved {len(df)} rows")
def main():
"""Orchestrate the ETL pipeline."""
try:
# Configuration
source = "raw_data/sales_2024.csv"
timestamp = datetime.now().strftime("%Y%m%d")
output = f"processed/sales_clean_{timestamp}.csv"
# Run pipeline
data = extract(source)
data = transform(data)
load(data, output)
logger.info("Pipeline completed successfully")
except Exception as e:
logger.error(f"Pipeline failed: {e}")
raise
if __name__ == "__main__":
main()
Configuration Files
Never hardcode file paths or credentials. Use YAML or JSON config files that can be changed without modifying code. This also allows different configurations for development vs production.
# config.yaml
# ---
# source_path: "data/raw/sales.csv"
# output_dir: "data/processed"
# database:
# host: "localhost"
# port: 5432
# name: "analytics"
import yaml
def load_config(config_path: str = "config.yaml") -> dict:
"""Load configuration from YAML file."""
with open(config_path, "r") as f:
return yaml.safe_load(f)
# Usage
config = load_config()
source = config["source_path"]
output_dir = config["output_dir"]
Processing Multiple Files
Real pipelines often process batches of files. Use pathlib to iterate over directories and process each file with the same logic.
from pathlib import Path
import pandas as pd
def process_all_files(input_dir: str, output_dir: str) -> None:
"""Process all CSV files in a directory."""
input_path = Path(input_dir)
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
# Find all CSV files
csv_files = list(input_path.glob("*.csv"))
print(f"Found {len(csv_files)} files to process")
for file in csv_files:
print(f"Processing: {file.name}")
# Read, transform, save
df = pd.read_csv(file)
df = df.drop_duplicates()
df["processed_date"] = pd.Timestamp.now()
# Save to output directory
output_file = output_path / f"clean_{file.name}"
df.to_csv(output_file, index=False)
print("All files processed!")
# Run
process_all_files("raw_data/", "processed_data/")
Database Integration
For production pipelines, data often comes from or goes to databases. SQLAlchemy provides a clean interface for database operations.
import pandas as pd
from sqlalchemy import create_engine
# Create database connection
engine = create_engine("postgresql://user:pass@localhost:5432/analytics")
# Extract from database
query = """
SELECT product_id, SUM(revenue) as total_revenue
FROM sales
WHERE sale_date >= '2024-01-01'
GROUP BY product_id
"""
df = pd.read_sql(query, engine)
# Transform
df["revenue_category"] = pd.cut(
df["total_revenue"],
bins=[0, 1000, 10000, float("inf")],
labels=["Low", "Medium", "High"]
)
# Load back to database
df.to_sql("product_summary", engine, if_exists="replace", index=False)
Practice Questions
Task: Modify the transform function to raise an error if the DataFrame has fewer than 10 rows after cleaning.
Show Solution
def transform(df: pd.DataFrame) -> pd.DataFrame:
df = df.drop_duplicates()
df = df.dropna()
if len(df) < 10:
raise ValueError(f"Too few rows after cleaning: {len(df)}")
return df
Task: Create a function that returns a dictionary with pipeline statistics: rows processed, duplicates removed, and processing time.
Show Solution
import time
def run_pipeline_with_stats(source: str) -> dict:
start_time = time.time()
df = pd.read_csv(source)
initial_rows = len(df)
df = df.drop_duplicates()
final_rows = len(df)
elapsed = time.time() - start_time
return {
"source": source,
"initial_rows": initial_rows,
"final_rows": final_rows,
"duplicates_removed": initial_rows - final_rows,
"processing_time_sec": round(elapsed, 2)
}
stats = run_pipeline_with_stats("sales.csv")
print(stats)
Task: Modify the pipeline to only process files modified since the last run. Store the last run timestamp in a JSON file.
Show Solution
import json
import os
from datetime import datetime
from pathlib import Path
STATE_FILE = "pipeline_state.json"
def get_last_run() -> datetime:
if os.path.exists(STATE_FILE):
with open(STATE_FILE, "r") as f:
state = json.load(f)
return datetime.fromisoformat(state["last_run"])
return datetime.min
def save_run_time():
with open(STATE_FILE, "w") as f:
json.dump({"last_run": datetime.now().isoformat()}, f)
def process_new_files(input_dir: str):
last_run = get_last_run()
for file in Path(input_dir).glob("*.csv"):
file_mtime = datetime.fromtimestamp(file.stat().st_mtime)
if file_mtime > last_run:
print(f"Processing new file: {file.name}")
df = pd.read_csv(file)
# ... process df
else:
print(f"Skipping unchanged: {file.name}")
save_run_time()
process_new_files("data/")
Scheduling Tasks
A pipeline is only useful if it runs automatically. Task schedulers trigger your scripts at specified times - daily reports, hourly data refreshes, or weekly summaries. This section covers both OS-level schedulers (cron, Task Scheduler) and Python-based scheduling.
Scheduling Options
- Cron (Linux/Mac): Built-in OS scheduler with powerful time expressions
- Task Scheduler (Windows): GUI-based scheduler for Windows systems
- Python schedule library: Simple in-process scheduling for scripts
- APScheduler: Advanced Python scheduler with persistence
Cron Jobs (Linux/Mac)
Cron is the standard task scheduler on Unix systems. Each cron job has five time fields followed by the command to run. Use crontab -e to edit your schedule.
# Cron time format:
# MIN HOUR DAY MONTH WEEKDAY command
# Run daily at 6:00 AM
0 6 * * * /usr/bin/python3 /home/user/scripts/daily_report.py
# Run every Monday at 9:00 AM
0 9 * * 1 /usr/bin/python3 /home/user/scripts/weekly_summary.py
# Run every hour
0 * * * * /usr/bin/python3 /home/user/scripts/hourly_check.py
# Run every 15 minutes
*/15 * * * * /usr/bin/python3 /home/user/scripts/data_sync.py
# Run on the 1st of every month at midnight
0 0 1 * * /usr/bin/python3 /home/user/scripts/monthly_archive.py
| Field | Values | Special Characters |
|---|---|---|
| Minute | 0-59 | * (any), */n (every n), , (list) |
| Hour | 0-23 | - (range), e.g., 9-17 |
| Day of Month | 1-31 | Combine: 0 9,17 * * * = 9AM and 5PM |
| Month | 1-12 | |
| Day of Week | 0-6 (Sun=0) |
... >> /var/log/myscript.log 2>&1
Windows Task Scheduler
Windows Task Scheduler provides a GUI for scheduling tasks. You can also create tasks via PowerShell for automation.
# PowerShell: Create a scheduled task to run daily at 8 AM
$action = New-ScheduledTaskAction -Execute "python" -Argument "C:\Scripts\daily_report.py"
$trigger = New-ScheduledTaskTrigger -Daily -At 8:00AM
$settings = New-ScheduledTaskSettingsSet -StartWhenAvailable
Register-ScheduledTask -TaskName "DailyAnalyticsReport" `
-Action $action `
-Trigger $trigger `
-Settings $settings `
-Description "Generate daily analytics report"
# View existing tasks
Get-ScheduledTask | Where-Object {$_.TaskName -like "*Analytics*"}
# Run a task manually
Start-ScheduledTask -TaskName "DailyAnalyticsReport"
Python schedule Library
For simpler needs, the schedule library lets you define schedules directly in Python. The script must stay running for scheduled jobs to execute.
# pip install schedule
import schedule
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def generate_report():
logger.info("Generating daily report...")
# Your pipeline code here
logger.info("Report complete!")
def sync_data():
logger.info("Syncing data...")
# Data sync logic here
# Schedule jobs
schedule.every().day.at("06:00").do(generate_report)
schedule.every().monday.at("09:00").do(generate_report)
schedule.every(15).minutes.do(sync_data)
schedule.every().hour.do(sync_data)
# Run the scheduler
logger.info("Scheduler started. Press Ctrl+C to exit.")
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
APScheduler for Production
APScheduler is more robust for production use. It supports multiple backends (memory, database, Redis) and can persist schedules across restarts.
# pip install apscheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
scheduler = BlockingScheduler()
def morning_report():
print(f"[{datetime.now()}] Running morning report")
def hourly_check():
print(f"[{datetime.now()}] Running hourly check")
# Add jobs with cron-like syntax
scheduler.add_job(morning_report, CronTrigger(hour=6, minute=0))
scheduler.add_job(hourly_check, CronTrigger(minute=0)) # Every hour
# Add job with interval
scheduler.add_job(
hourly_check,
"interval",
minutes=30,
id="data_sync"
)
print("Scheduler starting...")
scheduler.start()
Practice Questions
Task: Write a cron expression that runs a script every weekday (Mon-Fri) at 5:30 PM.
Show Solution
# MIN HOUR DAY MONTH WEEKDAY command
30 17 * * 1-5 /usr/bin/python3 /path/to/script.py
# Explanation:
# 30 = minute 30
# 17 = 5 PM (24-hour format)
# * = any day of month
# * = any month
# 1-5 = Monday through Friday
Task: Modify a scheduled job to retry up to 3 times if it fails, with a 5-minute delay between retries.
Show Solution
import time
import schedule
import logging
logger = logging.getLogger(__name__)
def run_with_retry(func, max_retries=3, delay=300):
"""Run a function with retry logic."""
for attempt in range(max_retries):
try:
func()
return # Success
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
logger.info(f"Retrying in {delay} seconds...")
time.sleep(delay)
logger.error("All retries exhausted")
def my_job():
# Simulate occasional failure
import random
if random.random() < 0.5:
raise Exception("Random failure")
print("Job succeeded!")
# Schedule with retry wrapper
schedule.every().day.at("06:00").do(
lambda: run_with_retry(my_job)
)
Automated Email Reports
Generating reports is only half the job - they need to reach stakeholders. Python's smtplib and email modules let you send automated emails with attachments, HTML formatting, and embedded charts.
SMTP Email Sending
SMTP (Simple Mail Transfer Protocol) is the standard for sending emails. Python's smtplib connects to an SMTP server (Gmail, Outlook, corporate server) to deliver messages. For Gmail, you'll need an App Password if 2FA is enabled.
Sending a Simple Email
Start with a basic text email. Use environment variables or config files for credentials - never hardcode passwords in scripts.
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import os
def send_email(subject: str, body: str, to_email: str):
"""Send a simple text email."""
# Load credentials from environment variables
smtp_server = os.getenv("SMTP_SERVER", "smtp.gmail.com")
smtp_port = int(os.getenv("SMTP_PORT", 587))
sender_email = os.getenv("SENDER_EMAIL")
sender_password = os.getenv("SENDER_PASSWORD")
# Create message
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = to_email
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
# Send email
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls() # Enable encryption
server.login(sender_email, sender_password)
server.send_message(msg)
print(f"Email sent to {to_email}")
# Usage
send_email(
subject="Daily Sales Report",
body="Sales totaled $45,230 today. See attached for details.",
to_email="manager@company.com"
)
HTML Emails with Tables
HTML emails look more professional and can include formatted tables. Convert pandas DataFrames directly to HTML tables.
import pandas as pd
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
def send_html_report(df: pd.DataFrame, to_email: str):
"""Send an HTML email with a data table."""
# Convert DataFrame to HTML table
table_html = df.to_html(index=False, classes="data-table")
html_body = f"""
<html>
<head>
<style>
body {{ font-family: Arial, sans-serif; }}
.data-table {{ border-collapse: collapse; width: 100%; }}
.data-table th {{ background-color: #6366f1; color: white; padding: 10px; }}
.data-table td {{ border: 1px solid #ddd; padding: 8px; }}
.data-table tr:nth-child(even) {{ background-color: #f9f9f9; }}
h2 {{ color: #333; }}
</style>
</head>
<body>
<h2>Daily Sales Report</h2>
<p>Here is your automated daily summary:</p>
{table_html}
<p>Generated automatically by the Analytics Pipeline.</p>
</body>
</html>
"""
msg = MIMEMultipart("alternative")
msg["Subject"] = "Daily Sales Report"
msg["From"] = sender_email
msg["To"] = to_email
msg.attach(MIMEText(html_body, "html"))
# Send via SMTP...
# Example data
sales_df = pd.DataFrame({
"Product": ["Widget A", "Widget B", "Widget C"],
"Units": [150, 230, 180],
"Revenue": ["$4,500", "$6,900", "$5,400"]
})
send_html_report(sales_df, "team@company.com")
Email with Attachments
Attach CSV, Excel, or PDF reports to your automated emails. Use MIMEBase for binary attachments.
from email.mime.base import MIMEBase
from email import encoders
import os
def send_email_with_attachment(to_email: str, subject: str, body: str, attachment_path: str):
"""Send email with a file attachment."""
msg = MIMEMultipart()
msg["From"] = os.getenv("SENDER_EMAIL")
msg["To"] = to_email
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
# Attach file
with open(attachment_path, "rb") as f:
attachment = MIMEBase("application", "octet-stream")
attachment.set_payload(f.read())
encoders.encode_base64(attachment)
filename = os.path.basename(attachment_path)
attachment.add_header("Content-Disposition", f"attachment; filename={filename}")
msg.attach(attachment)
# Send via SMTP (same as before)
with smtplib.SMTP("smtp.gmail.com", 587) as server:
server.starttls()
server.login(os.getenv("SENDER_EMAIL"), os.getenv("SENDER_PASSWORD"))
server.send_message(msg)
print(f"Email with attachment sent to {to_email}")
# Usage
send_email_with_attachment(
to_email="manager@company.com",
subject="Weekly Report - Week 42",
body="Please find the weekly analytics report attached.",
attachment_path="reports/weekly_report_w42.xlsx"
)
Practice Questions
Task: Modify the send_email function to accept a list of recipients.
Show Solution
def send_email(subject: str, body: str, to_emails: list):
"""Send email to multiple recipients."""
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = ", ".join(to_emails) # Comma-separated
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
with smtplib.SMTP("smtp.gmail.com", 587) as server:
server.starttls()
server.login(sender_email, sender_password)
server.sendmail(sender_email, to_emails, msg.as_string())
# Usage
send_email(
"Daily Report",
"Report attached.",
["alice@co.com", "bob@co.com", "charlie@co.com"]
)
Task: Create a function that sends an alert email only if sales drop below a threshold.
Show Solution
def check_and_alert(df: pd.DataFrame, threshold: float = 1000):
"""Send alert if daily sales fall below threshold."""
today_sales = df[df["date"] == pd.Timestamp.today().date()]["revenue"].sum()
if today_sales < threshold:
send_email(
subject="⚠️ LOW SALES ALERT",
body=f"Today's sales (${today_sales:,.2f}) are below the ${threshold:,.2f} threshold.",
to_email="alerts@company.com"
)
print("Alert sent!")
else:
print(f"Sales OK: ${today_sales:,.2f}")
# Run as part of daily pipeline
sales_df = pd.read_csv("daily_sales.csv")
check_and_alert(sales_df, threshold=5000)
Web Scraping for Data Collection
Not all data comes from databases or APIs. Web scraping extracts information from websites - competitor prices, public datasets, news articles, or any publicly available content. Python's BeautifulSoup and requests libraries make this straightforward.
Web Scraping Ethics & Legality
- Check robots.txt: Respect the site's scraping policy (e.g., example.com/robots.txt)
- Terms of Service: Some sites explicitly prohibit scraping
- Rate limiting: Don't overwhelm servers - add delays between requests
- Public data only: Never scrape private or login-protected content without permission
Basic Web Scraping
The requests library fetches web pages, and BeautifulSoup parses HTML to extract data. Install them with pip install requests beautifulsoup4.
# pip install requests beautifulsoup4
import requests
from bs4 import BeautifulSoup
def scrape_page(url: str) -> BeautifulSoup:
"""Fetch a web page and return parsed HTML."""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
response = requests.get(url, headers=headers)
response.raise_for_status() # Raise error for bad status codes
return BeautifulSoup(response.text, "html.parser")
# Example: Scrape quotes from a demo site
soup = scrape_page("https://quotes.toscrape.com/")
# Find all quote elements
quotes = soup.find_all("div", class_="quote")
for quote in quotes[:3]:
text = quote.find("span", class_="text").get_text()
author = quote.find("small", class_="author").get_text()
print(f'"{text}" - {author}')
Extracting Tables
Tables are common targets for scraping. BeautifulSoup can parse them, but pandas has a shortcut: pd.read_html() extracts all tables from a page into DataFrames.
import pandas as pd
# Automatically extract all tables from a webpage
url = "https://en.wikipedia.org/wiki/List_of_countries_by_GDP_(nominal)"
tables = pd.read_html(url)
print(f"Found {len(tables)} tables")
# Get the main data table (usually the largest)
gdp_df = tables[0] # Adjust index based on page structure
print(gdp_df.head())
Scraping Multiple Pages
Many websites paginate content. Loop through pages and combine results. Always add delays to avoid overwhelming the server.
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
def scrape_all_pages(base_url: str, max_pages: int = 5) -> list:
"""Scrape multiple pages of results."""
all_data = []
for page in range(1, max_pages + 1):
url = f"{base_url}?page={page}"
print(f"Scraping page {page}...")
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
# Extract items from this page
items = soup.find_all("div", class_="item")
for item in items:
all_data.append({
"title": item.find("h2").get_text(strip=True),
"price": item.find("span", class_="price").get_text(strip=True),
"page": page
})
# Be polite - wait between requests
time.sleep(1)
return all_data
# Usage
data = scrape_all_pages("https://example-shop.com/products", max_pages=10)
df = pd.DataFrame(data)
df.to_csv("scraped_products.csv", index=False)
Handling Dynamic Content
Some websites load content via JavaScript after the initial page load. For these, you need selenium to control a real browser.
# pip install selenium webdriver-manager
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
def scrape_dynamic_page(url: str) -> list:
"""Scrape a JavaScript-rendered page."""
# Setup Chrome in headless mode
options = webdriver.ChromeOptions()
options.add_argument("--headless")
driver = webdriver.Chrome(
service=Service(ChromeDriverManager().install()),
options=options
)
try:
driver.get(url)
# Wait for content to load
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, "product-card"))
)
# Extract data
products = driver.find_elements(By.CLASS_NAME, "product-card")
data = []
for product in products:
data.append({
"name": product.find_element(By.TAG_NAME, "h3").text,
"price": product.find_element(By.CLASS_NAME, "price").text
})
return data
finally:
driver.quit()
# Usage
products = scrape_dynamic_page("https://dynamic-site.com/products")
Practice Questions
Task: Write a function that returns all hyperlinks (href attributes) from a webpage.
Show Solution
def get_all_links(url: str) -> list:
"""Extract all links from a webpage."""
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
links = []
for a_tag in soup.find_all("a", href=True):
links.append(a_tag["href"])
return links
# Usage
links = get_all_links("https://example.com")
print(f"Found {len(links)} links")
for link in links[:5]:
print(link)
Task: Modify the scraper to retry failed requests up to 3 times with exponential backoff.
Show Solution
import time
def fetch_with_retry(url: str, max_retries: int = 3) -> str:
"""Fetch URL with exponential backoff retry."""
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.text
except requests.RequestException as e:
wait_time = 2 ** attempt # 1, 2, 4 seconds
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
raise Exception(f"Failed to fetch {url} after {max_retries} attempts")
# Usage
html = fetch_with_retry("https://example.com")
soup = BeautifulSoup(html, "html.parser")
Task: Create a scraper that checks a product's price daily and sends an email alert if it drops below a target price.
Show Solution
import requests
from bs4 import BeautifulSoup
import re
def check_price(url: str, target_price: float):
"""Check product price and alert if below target."""
response = requests.get(url, headers={"User-Agent": "Mozilla/5.0"})
soup = BeautifulSoup(response.text, "html.parser")
# Find price (adjust selector for specific site)
price_element = soup.find("span", class_="price")
price_text = price_element.get_text()
# Extract numeric price
price = float(re.sub(r"[^\d.]", "", price_text))
print(f"Current price: ${price:.2f}")
if price <= target_price:
send_email(
subject=f"Price Alert: ${price:.2f}!",
body=f"The product is now ${price:.2f} (target: ${target_price:.2f})\n{url}",
to_email="alerts@company.com"
)
print("Alert sent!")
else:
print(f"Price still above target (${target_price:.2f})")
# Schedule to run daily
# check_price("https://shop.com/product/123", target_price=49.99)
Key Takeaways
ETL Pattern
Structure pipelines as Extract-Transform-Load with separate functions for testability and maintainability.
Scheduling Options
Use cron (Linux) or Task Scheduler (Windows) for OS-level scheduling, or Python's schedule/APScheduler for in-process jobs.
Automated Emails
Python's smtplib sends emails with HTML formatting, tables, and attachments for automated report distribution.
Web Scraping
BeautifulSoup parses static HTML; Selenium handles JavaScript-rendered pages. Always respect robots.txt and rate limits.
Error Handling
Use try/except, logging, and retry logic to make pipelines resilient. Never let a single failure crash the entire workflow.
Security First
Store credentials in environment variables or secrets managers. Never hardcode passwords or commit them to version control.
Knowledge Check
Test your understanding of analytics automation:
What does ETL stand for in data pipeline terminology?
Which cron expression runs a job every weekday at 9:00 AM?
Which Python library is best for scraping JavaScript-rendered web pages?
What is the recommended way to store email credentials in an automation script?
What should you always check before scraping a website?
Why should pipeline functions be idempotent?