pyairbyte
Let's create a complete, production-ready PyAirbyte-based automation suite. This will be a significant upgrade from the direct SDK approach - it'll handle all the API complexity for you.
Complete PyAirbyte-Based Automation Guide
Part 1: Environment Setup
bash
# 1. Create and navigate to your project directory
mkdir ~/amazon-ads-automation
cd ~/amazon-ads-automation
# 2. Create a virtual environment (isolates dependencies)
python3 -m venv venv_312
# 3. Activate the virtual environment
# On macOS/Linux:
source venv_312/bin/activate
# On Windows:
# venv\Scripts\activate
# 4. Verify you're in the virtual environment
# You should see (venv) at the beginning of your terminal prompt
which python # Should point to your venv directory
# 5. Upgrade pip to latest version
pip install --upgrade pip
# 6. Create requirements.txt file
cat > requirements.txt << 'EOF'
# Core PyAirbyte - the magic sauce
airbyte>=0.10.0
# Data manipulation
pandas>=2.0.0
numpy>=1.24.0
# Excel handling
openpyxl>=3.1.0
xlsxwriter>=3.1.0 # Better Excel formatting support
# Environment and config
python-dotenv>=1.0.0
# CLI and scheduling
click>=8.1.0
schedule>=1.2.0
# Logging and debugging
loguru>=0.7.0
EOF
# 7. Install all dependencies
pip install -r requirements.txt
# 8. Verify PyAirbyte installed correctly
python -c "import airbyte; print(f'✅ PyAirbyte version: {airbyte.__version__}')"Part 2: Project Structure
Create the complete project structure:
bash
# Create directory structure
mkdir -p config src scripts data/raw data/cache output/reports logs tests
# Create __init__.py files to make Python packages
touch src/__init__.py
touch tests/__init__.py
# Create .gitignore
cat > .gitignore << 'EOF'
# Virtual environment
venv/
.env
__pycache__/
*.pyc
# Data files
data/raw/*
data/cache/*
!data/raw/.gitkeep
!data/cache/.gitkeep
# Output files
output/reports/*
!output/reports/.gitkeep
# Logs
logs/*
!logs/.gitkeep
# IDE files
.vscode/
.idea/
*.swp
*.swo
# OS files
.DS_Store
Thumbs.db
EOF
# Create placeholder directories
touch data/raw/.gitkeep data/cache/.gitkeep output/reports/.gitkeep logs/.gitkeepPart 3: Configuration Files
config/settings.py
python
# config/settings.py
from enum import Enum
from datetime import datetime, timedelta
class Region(Enum):
"""Amazon Advertising API regions"""
NA = "NA"
EU = "EU"
FE = "FE"
class ReportType(Enum):
"""Report types that map to your workbook sheets"""
CAMPAIGNS = "campaigns"
TARGETING = "targeting"
SEARCH_TERMS = "search_terms"
AD_GROUPS = "ad_groups"
PRODUCT_ADS = "product_ads"
class Config:
"""Central configuration settings"""
# API Settings
DEFAULT_REGION = Region.NA
MARKETPLACE_IDS = {
Region.NA: ["ATVPDKIKX0DER"], # US
Region.EU: ["A1PA6795UKMFR9"], # UK
Region.FE: ["A1VC38T8ZF5NL"], # JP
}
# Data Settings
LOOK_BACK_WINDOW = 30 # Days to look back for initial data
REPORT_TIMEOUT_MINUTES = 30
REPORT_MAX_RETRIES = 5
# Stream names from PyAirbyte
STREAMS = {
ReportType.CAMPAIGNS: "sponsored_product_campaigns",
ReportType.TARGETING: "sponsored_product_targetings",
ReportType.SEARCH_TERMS: "sponsored_product_search_term_report",
ReportType.AD_GROUPS: "sponsored_product_ad_groups",
ReportType.PRODUCT_ADS: "sponsored_product_product_ads",
}
# Mapping to your Excel sheet names
EXCEL_SHEETS = {
ReportType.CAMPAIGNS: "Campaigns-9",
ReportType.TARGETING: "Targeting",
ReportType.SEARCH_TERMS: "Search Terms",
}
# File paths
RAW_DATA_DIR = "data/raw"
CACHE_DIR = "data/cache"
OUTPUT_DIR = "output/reports"
LOG_DIR = "logs"
# Date formats
DATE_FORMAT = "%Y-%m-%d"
FILE_DATE_FORMAT = "%Y%m%d"
EXCEL_DATE_FORMAT = "mm/dd/yyyy".env (Placeholder - NEVER commit this!)
bash
# .env - Amazon Ads API Credentials
# Fill these in when your API access is approved!
# Required - from LWA console
CLIENT_ID=your_client_id_here
CLIENT_SECRET=your_client_secret_here
# Required - from OAuth flow
REFRESH_TOKEN=your_refresh_token_here
# Required - from profiles API call
PROFILE_IDS=123456789 # Comma-separated if multiple
# Region: NA, EU, or FE
REGION=NA
# Optional settings
LOOK_BACK_DAYS=60
LOG_LEVEL=INFOPart 4: Core Modules
src/auth.py
python
# src/auth.py
import os
from typing import Dict, List
from dotenv import load_dotenv
from loguru import logger
# Load environment variables
load_dotenv()
class Credentials:
"""Secure credential management"""
# Load from environment
CLIENT_ID = os.getenv("CLIENT_ID", "")
CLIENT_SECRET = os.getenv("CLIENT_SECRET", "")
REFRESH_TOKEN = os.getenv("REFRESH_TOKEN", "")
PROFILE_IDS = os.getenv("PROFILE_IDS", "")
REGION = os.getenv("REGION", "NA")
@classmethod
def validate(cls) -> bool:
"""Validate that credentials are configured"""
missing = []
if not cls.CLIENT_ID or "your_client_id" in cls.CLIENT_ID:
missing.append("CLIENT_ID")
if not cls.CLIENT_SECRET or "your_client_secret" in cls.CLIENT_SECRET:
missing.append("CLIENT_SECRET")
if not cls.REFRESH_TOKEN or "your_refresh_token" in cls.REFRESH_TOKEN:
missing.append("REFRESH_TOKEN")
if not cls.PROFILE_IDS:
missing.append("PROFILE_IDS")
if missing:
logger.warning(f"Missing credentials: {', '.join(missing)}")
logger.warning("Please update your .env file with actual credentials once approved.")
return False
return True
@classmethod
def get_profile_list(cls) -> List[int]:
"""Convert comma-separated profile IDs to list of integers"""
if not cls.PROFILE_IDS:
return []
return [int(pid.strip()) for pid in cls.PROFILE_IDS.split(",")]
@classmethod
def get_api_config(cls) -> Dict:
"""Get complete API configuration for PyAirbyte"""
return {
"auth_type": "oauth2.0",
"client_id": cls.CLIENT_ID,
"client_secret": cls.CLIENT_SECRET,
"refresh_token": cls.REFRESH_TOKEN,
"region": cls.REGION,
"profiles": cls.get_profile_list(),
"start_date": "2024-01-01", # Will be overridden in code
"state_filter": ["enabled", "paused"],
"look_back_window": 30,
"report_wait_timeout": 30,
"report_generation_max_retries": 5,
}src/api_client.py
python
# src/api_client.py
import airbyte as ab
import pandas as pd
from datetime import datetime, timedelta
from typing import Optional, List, Dict
from loguru import logger
from config.settings import Config, ReportType
from src.auth import Credentials
class AmazonAdsClient:
"""
PyAirbyte-powered Amazon Ads API client
Handles all data extraction with built-in retries and caching
"""
def __init__(self):
self.credentials = Credentials()
self.source = None
self.cache = None
self.config = Config()
def configure_source(self, start_date: Optional[datetime] = None):
"""
Configure the PyAirbyte source connector
This is where all the API complexity is handled for us!
"""
if not self.credentials.validate():
logger.warning("Using placeholder configuration for testing")
# Get base config from credentials
base_config = self.credentials.get_api_config()
# Set start date (default to 60 days ago)
if start_date is None:
start_date = datetime.now() - timedelta(days=self.config.LOOK_BACK_WINDOW)
base_config["start_date"] = start_date.strftime("%Y-%m-%d")
# Add marketplace IDs based on region
base_config["marketplace_ids"] = self.config.MARKETPLACE_IDS.get(
self.config.DEFAULT_REGION,
["ATVPDKIKX0DER"] # Default to US
)
logger.info(f"Configuring PyAirbyte source for region: {base_config['region']}")
logger.info(f"Start date: {base_config['start_date']}")
logger.info(f"Profiles: {base_config['profiles']}")
try:
# This single line replaces hundreds of lines of custom API code!
self.source = ab.get_source(
"source-amazon-ads",
install_if_missing=True,
config=base_config
)
# Test the connection
check_result = self.source.check()
if check_result.get("status") == "SUCCEEDED":
logger.success("✅ Successfully connected to Amazon Ads API")
return True
else:
logger.error(f"❌ Connection failed: {check_result}")
return False
except Exception as e:
logger.error(f"❌ Failed to configure source: {str(e)}")
return False
def list_available_streams(self) -> List[str]:
"""List all available data streams from the API"""
if not self.source:
logger.error("Source not configured. Call configure_source() first.")
return []
streams = self.source.get_available_streams()
logger.info(f"📊 Available streams: {len(streams)} total")
for i, stream in enumerate(streams[:10]): # Show first 10
logger.info(f" - {stream}")
return streams
def select_streams(self, stream_names: List[str]):
"""Select specific streams for extraction"""
if not self.source:
logger.error("Source not configured")
return
for stream in stream_names:
self.source.select_stream(stream)
logger.debug(f"Selected stream: {stream}")
logger.success(f"✅ Selected {len(stream_names)} streams")
def select_all_streams(self):
"""Select all available streams (useful for exploration)"""
if not self.source:
logger.error("Source not configured")
return
self.source.select_all_streams()
logger.success("✅ Selected all available streams")
def read_to_cache(self, cache_name: Optional[str] = None):
"""
Read selected streams into local cache
PyAirbyte handles all the pagination, rate limiting, and retries!
"""
if not self.source:
logger.error("Source not configured")
return None
try:
# Use DuckDB cache (included with PyAirbyte)
self.cache = ab.get_default_cache()
logger.info("⏳ Reading data from API (this may take several minutes)...")
logger.info(" PyAirbyte is handling pagination, rate limits, and retries automatically")
# This is where the magic happens
result = self.source.read(cache=self.cache)
logger.success(f"✅ Data loaded into cache successfully")
return self.cache
except Exception as e:
logger.error(f"❌ Failed to read data: {str(e)}")
return None
def get_stream_dataframe(self, stream_name: str) -> Optional[pd.DataFrame]:
"""
Convert a cached stream to pandas DataFrame
This is where we get the actual data for analysis
"""
if not self.cache:
logger.error("No cache available. Run read_to_cache() first.")
return None
try:
df = self.cache[stream_name].to_pandas()
logger.info(f"📈 Loaded stream '{stream_name}': {len(df)} rows, {len(df.columns)} columns")
return df
except Exception as e:
logger.error(f"❌ Failed to load stream '{stream_name}': {str(e)}")
return None
def get_report_dataframe(self, report_type: ReportType) -> Optional[pd.DataFrame]:
"""
Convenience method to get a specific report type
Maps your report types to the correct stream names
"""
stream_name = self.config.STREAMS.get(report_type)
if not stream_name:
logger.error(f"Unknown report type: {report_type}")
return None
return self.get_stream_dataframe(stream_name)
def get_all_campaign_data(self) -> Dict[str, pd.DataFrame]:
"""
Get all data needed for your Campaigns-9 workbook
Returns a dictionary of DataFrames
"""
result = {}
# Get campaigns data
campaigns_df = self.get_report_dataframe(ReportType.CAMPAIGNS)
if campaigns_df is not None:
result['campaigns'] = campaigns_df
# Get targeting data
targeting_df = self.get_report_dataframe(ReportType.TARGETING)
if targeting_df is not None:
result['targeting'] = targeting_df
# Get search terms
search_terms_df = self.get_report_dataframe(ReportType.SEARCH_TERMS)
if search_terms_df is not None:
result['search_terms'] = search_terms_df
return resultsrc/excel_generator.py
python
# src/excel_generator.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Optional, List
from loguru import logger
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils.dataframe import dataframe_to_rows
from config.settings import Config
class ExcelGenerator:
"""
Creates beautifully formatted Excel workbooks in your Campaigns-9 style
Handles all the formatting, calculations, and structure you've designed
"""
def __init__(self, output_dir: str = None):
self.config = Config()
self.output_dir = Path(output_dir or self.config.OUTPUT_DIR)
self.output_dir.mkdir(parents=True, exist_ok=True)
# Excel formatting styles
self.header_font = Font(bold=True, color="FFFFFF")
self.header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid")
self.date_fill = PatternFill(start_color="D3D3D3", end_color="D3D3D3", fill_type="solid") # Gray for date rows
self.todo_fill = PatternFill(start_color="FFE699", end_color="FFE699", fill_type="solid") # Yellow for TODO
self.thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
def generate_filename(self) -> str:
"""Generate filename in your format: Ads_sfq-16_YYYYMMDD.xlsx"""
today = datetime.now().strftime(self.config.FILE_DATE_FORMAT)
return f"Ads_sfq-16_{today}.xlsx"
def create_campaigns_sheet(self, df: pd.DataFrame, writer: pd.ExcelWriter, sheet_name: str):
"""
Create the main Campaigns-9 sheet with your exact structure:
- Date markers as separators
- Lifetime and Last 7 days sections
- Calculated metrics (CTR, CPC, ACOS, ROAS)
- Summary rows with totals
- TODO list section
"""
# Create a copy to work with
campaigns_data = df.copy() if df is not None else pd.DataFrame()
# If no real data, create placeholder with your structure
if campaigns_data.empty:
campaigns_data = self._create_placeholder_campaigns()
# Sort by Orders descending as per your instructions
if 'Orders' in campaigns_data.columns:
campaigns_data = campaigns_data.sort_values('Orders', ascending=False)
# Write to Excel
campaigns_data.to_excel(writer, sheet_name=sheet_name, index=False, startrow=2)
# Get workbook and worksheet for formatting
workbook = writer.book
worksheet = writer.sheets[sheet_name]
# Add your header with date markers and sections
self._add_campaigns_header(worksheet, workbook)
# Format the data
self._format_campaigns_sheet(worksheet, campaigns_data)
# Add TODO list section at the bottom
self._add_todo_section(worksheet, workbook, start_row=len(campaigns_data) + 10)
logger.success(f"✅ Created '{sheet_name}' sheet with {len(campaigns_data)} rows")
def _create_placeholder_campaigns(self) -> pd.DataFrame:
"""Create placeholder data matching your Campaigns-9 structure"""
placeholder_data = {
'State': ['ENABLED', 'ENABLED', 'ENABLED'],
'Campaigns': [
'Sound Rise II - alarm clock',
'Sound Rise II - soundfreaq',
'Sound Rise II - category 11'
],
'Status': ['CAMPAIGN_STATUS_ENABLED'] * 3,
'Start date': ['09/04/2024', '06/21/2024', '11/15/2024'],
'End date': ['', '', ''],
'Budget (converted)': ['$30.00', '$15.00', '$30.00'],
'Budget': [30.00, 15.00, 30.00],
'Clicks': [7052, 904, 4417],
'CTR': [0.004, 0.0131, 0.0071],
'Spend (converted)': [8319.69, 953.13, 4068.21],
'Spend': [8319.69, 953.13, 4068.21],
'CPC (converted)': [1.18, 1.05, 0.92],
'CPC': [1.18, 1.05, 0.92],
'Orders': [231, 116, 111],
'Sales (converted)': [20779.81, 13415.83, 9592.97],
'Sales': [20779.81, 13415.83, 9592.97],
'ACOS': [0.4004, 0.071, 0.4241],
'ROAS': [2.5, 14.08, 2.36],
}
df = pd.DataFrame(placeholder_data)
# Calculate derived metrics
df['CTR'] = df['Clicks'] / 1_000_000 # Placeholder calculation
df['ACOS'] = df['Spend'] / df['Sales']
df['ROAS'] = df['Sales'] / df['Spend']
return df
def _add_campaigns_header(self, worksheet, workbook):
"""Add the structured header with date markers"""
# Add date marker row (like your "2025-09-22 00:00:00" rows)
date_row = 1
date_cell = worksheet.cell(row=date_row, column=1)
date_cell.value = datetime.now().strftime("%Y-%m-%d 00:00:00")
date_cell.font = Font(bold=True)
date_cell.fill = self.date_fill
# Add column headers row
header_row = 3
headers = ['State', 'Campaigns', 'Status', 'Start date', 'End date',
'Budget (converted)', 'Budget', 'Clicks', 'CTR',
'Spend (converted)', 'Spend', 'CPC (converted)', 'CPC',
'Orders', 'Sales (converted)', 'Sales', 'ACOS', 'ROAS']
for col, header in enumerate(headers, start=1):
cell = worksheet.cell(row=header_row, column=col)
cell.value = header
cell.font = self.header_font
cell.fill = self.header_fill
cell.alignment = Alignment(horizontal='center')
cell.border = self.thin_border
def _format_campaigns_sheet(self, worksheet, df: pd.DataFrame):
"""Apply formatting to the campaigns sheet"""
# Format currency columns
currency_cols = ['Budget', 'Spend', 'Sales', 'Budget (converted)',
'Spend (converted)', 'Sales (converted)']
for col_name in currency_cols:
if col_name in df.columns:
col_idx = df.columns.get_loc(col_name) + 1
for row in range(4, len(df) + 4):
cell = worksheet.cell(row=row, column=col_idx)
cell.number_format = '"$"#,##0.00'
# Format percentage columns
pct_cols = ['CTR', 'ACOS']
for col_name in pct_cols:
if col_name in df.columns:
col_idx = df.columns.get_loc(col_name) + 1
for row in range(4, len(df) + 4):
cell = worksheet.cell(row=row, column=col_idx)
cell.number_format = '0.0000%'
# Format CPC columns
cpc_cols = ['CPC', 'CPC (converted)']
for col_name in cpc_cols:
if col_name in df.columns:
col_idx = df.columns.get_loc(col_name) + 1
for row in range(4, len(df) + 4):
cell = worksheet.cell(row=row, column=col_idx)
cell.number_format = '"$"#,##0.00'
# Add borders to all data cells
for row in range(4, len(df) + 4):
for col in range(1, len(df.columns) + 1):
cell = worksheet.cell(row=row, column=col)
cell.border = self.thin_border
# Auto-adjust column widths
for col in worksheet.columns:
max_length = 0
column = col[0].column_letter
for cell in col:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column].width = adjusted_width
def _add_todo_section(self, worksheet, workbook, start_row: int):
"""Add the TODO list section at the bottom"""
# TODO header
todo_header = worksheet.cell(row=start_row, column=1)
todo_header.value = "TO DO LIST"
todo_header.font = Font(bold=True, size=12)
todo_header.fill = self.todo_fill
# Add your standard TODO items
todos = [
"1. reduce daily budget (category 11 & 2026) to $10/day, until we have a better approach",
"2. category 2026 (bid adjustments - use historical cpc from category 11; negative keywords - add; negative products - add; wood skus - kick out)",
"3. shall we increase spend (by 1. adding more asin targets; 2. increasing bids?)",
"4. Please take actions and let's discuss once you are done."
]
for i, todo in enumerate(todos, start=1):
cell = worksheet.cell(row=start_row + i, column=1)
cell.value = todo
cell.font = Font(italic=True)
def create_targeting_sheet(self, df: pd.DataFrame, writer: pd.ExcelWriter, sheet_name: str):
"""Create the Targeting sheet with keyword performance data"""
if df is not None and not df.empty:
# Process targeting data
df.to_excel(writer, sheet_name=sheet_name, index=False)
logger.success(f"✅ Created '{sheet_name}' sheet with {len(df)} rows")
else:
# Create placeholder
placeholder = pd.DataFrame({
'Keyword': ['bose clock radio', 'sony clock radio', 'ihome clock radio'],
'Match type': ['PHRASE'] * 3,
'Status': ['ENABLED'] * 3,
'Impressions': [45630, 20549, 25468],
'Clicks': [158, 82, 59],
'CTR': [0.0035, 0.004, 0.0023],
'Spend(USD)': [210.42, 90.49, 65.94],
'Orders': [14, 7, 7],
'Sales(USD)': [1249.86, 569.93, 679.93],
'ACOS': [0.1684, 0.1588, 0.097],
})
placeholder.to_excel(writer, sheet_name=sheet_name, index=False)
logger.warning(f"⚠️ Created placeholder for '{sheet_name}'")
def create_bid_adjustments_sheet(self, writer: pd.ExcelWriter):
"""Create the Bid adjustments sheet with placement data"""
# Create placeholder matching your structure
data = {
'Placements': ['PLACEMENT_GROUP_TOP', 'PLACEMENT_GROUP_OTHER', 'PLACEMENT_GROUP_DETAIL_PAGE'],
'Campaign bidding strategy': ['Fixed bids'] * 3,
'Bid adjustment': [0, 0, 0],
'Impressions': [1273, 876, 19654],
'Clicks': [170, 22, 53],
'CTR': [0.1335, 0.0251, 0.0027],
'Spend(USD)': [115.3, 20.76, 70.61],
'Orders': [28, 1, 9],
'Sales(USD)': [2589.69, 79.99, 839.91],
'ACOS': [0.0445, 0.2595, 0.0841],
}
df = pd.DataFrame(data)
df.to_excel(writer, sheet_name='Bid adjustments', index=False)
logger.success("✅ Created 'Bid adjustments' sheet")
def create_negative_keywords_sheet(self, writer: pd.ExcelWriter):
"""Create the Negative keywords tracking sheet"""
# Sample data from your file
data = {
'Campaign': ['clock radio 241128', 'category 11', 'local records'],
'Negative Keywords': ['airplay', 'dab', 'alexa'],
'Date Added': [datetime.now().strftime("%Y-%m-%d")] * 3,
}
df = pd.DataFrame(data)
df.to_excel(writer, sheet_name='Negative keywords', index=False)
logger.success("✅ Created 'Negative keywords' sheet")
def generate_workbook(self, data: Dict[str, pd.DataFrame]) -> str:
"""
Generate the complete Excel workbook with all sheets
This is the main method that creates your final report
"""
filename = self.generate_filename()
filepath = self.output_dir / filename
logger.info(f"📗 Generating workbook: {filename}")
with pd.ExcelWriter(filepath, engine='openpyxl') as writer:
# 1. Main Campaigns-9 sheet
campaigns_df = data.get('campaigns')
self.create_campaigns_sheet(campaigns_df, writer, 'Campaigns-9')
# 2. Targeting sheet
targeting_df = data.get('targeting')
self.create_targeting_sheet(targeting_df, writer, 'Targeting')
# 3. Bid adjustments sheet
self.create_bid_adjustments_sheet(writer)
# 4. Negative keywords sheet
self.create_negative_keywords_sheet(writer)
# 5. Category search terms sheet (if available)
search_terms_df = data.get('search_terms')
if search_terms_df is not None:
search_terms_df.to_excel(writer, sheet_name='Category_search_terms', index=False)
logger.success(f"✅ Workbook generation complete: {filepath}")
return str(filepath)src/logger_setup.py
python
# src/logger_setup.py
import sys
from pathlib import Path
from loguru import logger
def setup_logger(log_dir: str = "logs"):
"""Configure loguru for beautiful logging"""
# Remove default handler
logger.remove()
# Add console handler with colors
logger.add(
sys.stderr,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO",
colorize=True
)
# Add file handler
log_path = Path(log_dir)
log_path.mkdir(exist_ok=True)
logger.add(
log_path / "amazon_ads_{time:YYYY-MM-DD}.log",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level="DEBUG",
rotation="1 day",
retention="30 days",
compression="zip"
)
return loggerPart 5: Main Orchestration Script
scripts/run_weekly.py
python
#!/usr/bin/env python3
"""
Amazon Ads Weekly Data Automation
Run this script weekly to fetch campaign data and update your Campaigns-9 workbook
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import click
from datetime import datetime, timedelta
from pathlib import Path
from loguru import logger
from src.api_client import AmazonAdsClient
from src.excel_generator import ExcelGenerator
from src.logger_setup import setup_logger
from src.auth import Credentials
from config.settings import Config, ReportType
# Setup logging
logger = setup_logger()
@click.command()
@click.option('--start-date', default=None, help='Start date for data (YYYY-MM-DD)')
@click.option('--end-date', default=None, help='End date for data (YYYY-MM-DD)')
@click.option('--test', is_flag=True, help='Run in test mode (no API calls)')
@click.option('--verbose', is_flag=True, help='Enable verbose logging')
def main(start_date, end_date, test, verbose):
"""
🚀 Amazon Ads Weekly Automation
Fetches campaign data from Amazon Ads API and generates your Campaigns-9 Excel workbook.
Run this weekly (e.g., every Friday) to keep your reports up to date.
"""
if verbose:
logger.remove()
logger.add(sys.stderr, level="DEBUG")
logger.info("=" * 60)
logger.info("📊 AMAZON ADS WEEKLY AUTOMATION STARTING")
logger.info("=" * 60)
# Parse dates
start = None
end = None
if start_date:
start = datetime.strptime(start_date, "%Y-%m-%d")
if end_date:
end = datetime.strptime(end_date, "%Y-%m-%d")
# Default to last 7 days
if not end:
end = datetime.now() - timedelta(days=1) # Yesterday
if not start:
start = end - timedelta(days=7) # Last 7 days
logger.info(f"📅 Report period: {start.strftime('%Y-%m-%d')} to {end.strftime('%Y-%m-%d')}")
if test:
logger.info("🧪 Running in TEST MODE - generating sample workbook")
_run_test_mode(start, end)
return
# Check credentials
if not Credentials.validate():
logger.error("❌ Credentials not properly configured")
logger.info(" Please update your .env file with actual credentials once approved.")
logger.info(" For now, run with --test flag to see the workflow.")
return
# Step 1: Initialize API client
logger.info("\n🔌 Step 1: Initializing Amazon Ads API client...")
client = AmazonAdsClient()
if not client.configure_source(start_date=start):
logger.error("❌ Failed to configure API client")
return
# Step 2: Select relevant streams
logger.info("\n🔍 Step 2: Selecting data streams...")
# Map your report types to stream names
streams_to_select = [
Config.STREAMS[ReportType.CAMPAIGNS],
Config.STREAMS[ReportType.TARGETING],
Config.STREAMS[ReportType.SEARCH_TERMS],
"sponsored_product_ad_groups",
"profiles",
]
client.select_streams(streams_to_select)
# Step 3: Extract data (PyAirbyte handles all the complexity!)
logger.info("\n📥 Step 3: Extracting data from Amazon Ads API...")
logger.info(" ⏱️ This may take 5-10 minutes as reports generate")
logger.info(" 🤖 PyAirbyte is handling pagination, rate limits, and retries automatically")
cache = client.read_to_cache()
if not cache:
logger.error("❌ Failed to extract data")
return
# Step 4: Load data into DataFrames
logger.info("\n🔄 Step 4: Loading data into DataFrames...")
data = {}
# Get campaigns data
campaigns_df = client.get_stream_dataframe(Config.STREAMS[ReportType.CAMPAIGNS])
if campaigns_df is not None:
logger.info(f" Campaigns: {len(campaigns_df)} rows")
data['campaigns'] = campaigns_df
# Get targeting data
targeting_df = client.get_stream_dataframe(Config.STREAMS[ReportType.TARGETING])
if targeting_df is not None:
logger.info(f" Targeting: {len(targeting_df)} rows")
data['targeting'] = targeting_df
# Get search terms
search_df = client.get_stream_dataframe(Config.STREAMS[ReportType.SEARCH_TERMS])
if search_df is not None:
logger.info(f" Search terms: {len(search_df)} rows")
data['search_terms'] = search_df
# Step 5: Generate Excel workbook
logger.info("\n📗 Step 5: Generating Campaigns-9 Excel workbook...")
excel_gen = ExcelGenerator()
output_file = excel_gen.generate_workbook(data)
# Step 6: Log summary
logger.info("\n" + "=" * 60)
logger.success("✅ WEEKLY AUTOMATION COMPLETED SUCCESSFULLY")
logger.info("=" * 60)
logger.info(f"📁 Output file: {output_file}")
# Calculate some summary stats
if campaigns_df is not None:
total_spend = campaigns_df['cost'].sum() if 'cost' in campaigns_df.columns else 0
total_orders = campaigns_df['purchases'].sum() if 'purchases' in campaigns_df.columns else 0
total_sales = campaigns_df['sales'].sum() if 'sales' in campaigns_df.columns else 0
logger.info("\n📊 Weekly Summary:")
logger.info(f" Total Spend: ${total_spend:,.2f}")
logger.info(f" Total Orders: {total_orders}")
logger.info(f" Total Sales: ${total_sales:,.2f}")
if total_spend > 0:
logger.info(f" Overall ACOS: {(total_spend/total_sales)*100:.2f}%")
logger.info("\n🏁 Done! Ready for your analysis and TODO list.")
def _run_test_mode(start_date: datetime, end_date: datetime):
"""Run in test mode - generates sample workbook without API calls"""
logger.info("🧪 Creating sample workbook with placeholder data...")
excel_gen = ExcelGenerator()
# Create empty data dict - generator will use placeholders
data = {}
output_file = excel_gen.generate_workbook(data)
logger.success(f"✅ Test workbook created: {output_file}")
logger.info(" When your API credentials are ready, remove the --test flag to get real data.")
if __name__ == "__main__":
main()scripts/schedule_weekly.py (Optional - for automated scheduling)
python
#!/usr/bin/env python3
"""
Scheduler for weekly automation
Run this as a background process or use with cron/Task Scheduler
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import schedule
import time
from datetime import datetime
from loguru import logger
from src.logger_setup import setup_logger
logger = setup_logger()
def run_weekly_job():
"""Wrapper function to run the main script"""
logger.info("🕒 Scheduled weekly job triggered")
# Import here to avoid circular imports
import subprocess
# Run the main script
result = subprocess.run(
[sys.executable, "scripts/run_weekly.py"],
capture_output=True,
text=True
)
if result.returncode == 0:
logger.success("✅ Scheduled job completed successfully")
logger.debug(f"Output: {result.stdout}")
else:
logger.error(f"❌ Scheduled job failed: {result.stderr}")
def main():
"""Set up and run the scheduler"""
logger.info("⏰ Starting Amazon Ads weekly scheduler")
# Schedule for every Friday at 9:00 AM
schedule.every().friday.at("09:00").do(run_weekly_job)
# Also run immediately on startup (for testing)
# schedule.every(1).minutes.do(run_weekly_job) # For testing every minute
logger.info("Scheduler running. Next run: Friday at 09:00")
logger.info("Press Ctrl+C to stop")
try:
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
except KeyboardInterrupt:
logger.info("🛑 Scheduler stopped by user")
if __name__ == "__main__":
main()Part 6: Testing Scripts
tests/test_placeholder.py
python
#!/usr/bin/env python3
"""
Test script to verify everything works without API credentials
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.excel_generator import ExcelGenerator
from src.logger_setup import setup_logger
logger = setup_logger()
def test_placeholder_generation():
"""Test that we can generate a placeholder workbook"""
logger.info("🧪 Testing placeholder workbook generation")
excel_gen = ExcelGenerator()
output_file = excel_gen.generate_workbook({})
logger.success(f"✅ Test passed! Workbook created: {output_file}")
return True
if __name__ == "__main__":
test_placeholder_generation()Part 7: Make Scripts Executable
bash
# Make the Python scripts executable
chmod +x scripts/run_weekly.py
chmod +x scripts/schedule_weekly.py
chmod +x tests/test_placeholder.py
# Create a convenience runner script
cat > run.sh << 'EOF'
#!/bin/bash
source venv/bin/activate
python scripts/run_weekly.py "$@"
EOF
chmod +x run.shPart 8: README and Documentation
README.md
markdown
# Amazon Ads Weekly Automation
## Overview
This system automates the weekly collection of Amazon Advertising data and generates a beautifully formatted Excel workbook in your "Campaigns-9" style.
## Features
- ✅ PyAirbyte-powered - handles all API complexity automatically
- ✅ Secure credential management with .env
- ✅ Beautiful logging with loguru
- ✅ Campaigns-9 style workbook with proper formatting
- ✅ TODO list section preserved
- ✅ Test mode for pre-approval development
- ✅ Scheduled execution (optional)
## Quick Start
### 1. Setup Environment
```bash
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt2. Configure Credentials
Edit .env with your API credentials once approved.
3. Test Without Credentials
bash
python scripts/run_weekly.py --test4. Run Real Automation
bash
python scripts/run_weekly.py5. Schedule Weekly Runs
bash
# Option 1: Use the built-in scheduler
python scripts/schedule_weekly.py
# Option 2: Use cron (macOS/Linux)
# Add to crontab: 0 9 * * 5 cd /path/to/project && ./run.sh
# Option 3: Use Task Scheduler (Windows)File Structure
amazon-ads-automation/
├── .env # Your credentials (never commit!)
├── requirements.txt # Dependencies
├── config/
│ └── settings.py # Configuration settings
├── src/
│ ├── auth.py # Credential management
│ ├── api_client.py # PyAirbyte client
│ ├── excel_generator.py # Excel formatting
│ └── logger_setup.py # Logging setup
├── scripts/
│ ├── run_weekly.py # Main automation script
│ └── schedule_weekly.py # Optional scheduler
├── tests/
│ └── test_placeholder.py # Pre-approval testing
└── output/
└── reports/ # Generated Excel filesWhat PyAirbyte Handles For You
- 🔄 OAuth2 authentication and token refresh
- ⏱️ Report generation polling (up to 30 minutes)
- 📊 Pagination of large result sets
- 🚦 Rate limiting and retries
- 💾 Automatic caching with DuckDB
- 🔌 20+ pre-built data streams
### Final Step: Test It Now!
Even without API credentials, you can verify everything works:
```bash
# 1. Activate virtual environment
source venv_312/bin/activate
# 2. Run the test
python tests/test_placeholder.py
# 3. Run the main script in test mode
python scripts/run_weekly.py --test
# 4. Check the output
ls -la output/reports/You should see a beautifully formatted Excel file with your Campaigns-9 structure, complete with TODO lists and proper formatting!
What's Next?
Once your API credentials are approved:
- Update
.envwith your actualCLIENT_ID,CLIENT_SECRET,REFRESH_TOKEN, andPROFILE_IDS - Run
python scripts/run_weekly.pyto get real data - The script will automatically:
- Connect to Amazon Ads API
- Request the last 7 days of data
- Wait for reports to generate (PyAirbyte handles this)
- Download and transform the data
- Generate your Campaigns-9 Excel workbook
The beauty of PyAirbyte is that you don't need to write any code for the API complexities - it's all handled for you!
bash
curl -X POST \
https://api.amazon.com/auth/o2/token \
-d "grant_type=refresh_token" \
-d "refresh_token=Atzr|IwEBIFwUoM8wV_K5Sq-Yb61pgsFmk-7MrK9l1SlxHrLmZBae1yQEA8AniwcpqNDFtzxRDzyuAoUjH54hiiu5hmWqewT99pkoZpw0Z0hKP4pVH4Sqp4ZvUgo9N4Gmw66oN6dnebYx6UNH9JrPlYUL5rLetX9x2HTxOydKXR160gcvn_ONfkv_1rZKaIfuvxvovSXvQoPrwCdcbO_DkN7_qgu-uLPRapFNXkmY2XgV1m_2X14Tbh2PBOzVvC2-HxkFFQhKdNLRcpuSS6nMEMm9AgXq2dFAVwLBIEDd0fXXeQ-8m9aDgcXF7KYaftueQyXdWk9fF6Xth3vzMjd42JT-Wv-QIJZiZptAg99O9kXcEsmqT6Rslk50lxELO6ovljY0Df4_mLle2dqjZWx18SSJ2_s8tOV01KL1EA5L4IaavGzduAUlOUwP1LZ5SpjQSyhamptyeMJ7A_nJ76nDtXN6q99BDAwN9xE_FExvEVRXZaFXnuz8eXe43DbETWxVvpDTj1PYhcqOFIqmzChSTPIzTOnVTH3-y1M8t7ct5nO4-OEdw8l9Tw" \
-d "client_id=amzn1.application-oa2-client.c73aae635e0644239fd1860292f93785" \
-d "client_secret=amzn1.oa2-cs.v1.321e860063b33e346f59453efde2413c2476a13da379702cea972d662708d1cb"