Skip to content
0

pyairbyte

Let's create a complete, production-ready PyAirbyte-based automation suite. This will be a significant upgrade from the direct SDK approach - it'll handle all the API complexity for you.

Complete PyAirbyte-Based Automation Guide

Part 1: Environment Setup

bash
# 1. Create and navigate to your project directory
mkdir ~/amazon-ads-automation
cd ~/amazon-ads-automation

# 2. Create a virtual environment (isolates dependencies)
python3 -m venv venv_312

# 3. Activate the virtual environment
# On macOS/Linux:
source venv_312/bin/activate
# On Windows:
# venv\Scripts\activate

# 4. Verify you're in the virtual environment
# You should see (venv) at the beginning of your terminal prompt
which python  # Should point to your venv directory

# 5. Upgrade pip to latest version
pip install --upgrade pip

# 6. Create requirements.txt file
cat > requirements.txt << 'EOF'
# Core PyAirbyte - the magic sauce
airbyte>=0.10.0

# Data manipulation
pandas>=2.0.0
numpy>=1.24.0

# Excel handling
openpyxl>=3.1.0
xlsxwriter>=3.1.0  # Better Excel formatting support

# Environment and config
python-dotenv>=1.0.0

# CLI and scheduling
click>=8.1.0
schedule>=1.2.0

# Logging and debugging
loguru>=0.7.0
EOF

# 7. Install all dependencies
pip install -r requirements.txt

# 8. Verify PyAirbyte installed correctly
python -c "import airbyte; print(f'✅ PyAirbyte version: {airbyte.__version__}')"

Part 2: Project Structure

Create the complete project structure:

bash
# Create directory structure
mkdir -p config src scripts data/raw data/cache output/reports logs tests

# Create __init__.py files to make Python packages
touch src/__init__.py
touch tests/__init__.py

# Create .gitignore
cat > .gitignore << 'EOF'
# Virtual environment
venv/
.env
__pycache__/
*.pyc

# Data files
data/raw/*
data/cache/*
!data/raw/.gitkeep
!data/cache/.gitkeep

# Output files
output/reports/*
!output/reports/.gitkeep

# Logs
logs/*
!logs/.gitkeep

# IDE files
.vscode/
.idea/
*.swp
*.swo

# OS files
.DS_Store
Thumbs.db
EOF

# Create placeholder directories
touch data/raw/.gitkeep data/cache/.gitkeep output/reports/.gitkeep logs/.gitkeep

Part 3: Configuration Files

config/settings.py

python
# config/settings.py
from enum import Enum
from datetime import datetime, timedelta

class Region(Enum):
    """Amazon Advertising API regions"""
    NA = "NA"
    EU = "EU"
    FE = "FE"

class ReportType(Enum):
    """Report types that map to your workbook sheets"""
    CAMPAIGNS = "campaigns"
    TARGETING = "targeting"
    SEARCH_TERMS = "search_terms"
    AD_GROUPS = "ad_groups"
    PRODUCT_ADS = "product_ads"

class Config:
    """Central configuration settings"""

    # API Settings
    DEFAULT_REGION = Region.NA
    MARKETPLACE_IDS = {
        Region.NA: ["ATVPDKIKX0DER"],  # US
        Region.EU: ["A1PA6795UKMFR9"],  # UK
        Region.FE: ["A1VC38T8ZF5NL"],   # JP
    }

    # Data Settings
    LOOK_BACK_WINDOW = 30  # Days to look back for initial data
    REPORT_TIMEOUT_MINUTES = 30
    REPORT_MAX_RETRIES = 5

    # Stream names from PyAirbyte
    STREAMS = {
        ReportType.CAMPAIGNS: "sponsored_product_campaigns",
        ReportType.TARGETING: "sponsored_product_targetings",
        ReportType.SEARCH_TERMS: "sponsored_product_search_term_report",
        ReportType.AD_GROUPS: "sponsored_product_ad_groups",
        ReportType.PRODUCT_ADS: "sponsored_product_product_ads",
    }

    # Mapping to your Excel sheet names
    EXCEL_SHEETS = {
        ReportType.CAMPAIGNS: "Campaigns-9",
        ReportType.TARGETING: "Targeting",
        ReportType.SEARCH_TERMS: "Search Terms",
    }

    # File paths
    RAW_DATA_DIR = "data/raw"
    CACHE_DIR = "data/cache"
    OUTPUT_DIR = "output/reports"
    LOG_DIR = "logs"

    # Date formats
    DATE_FORMAT = "%Y-%m-%d"
    FILE_DATE_FORMAT = "%Y%m%d"
    EXCEL_DATE_FORMAT = "mm/dd/yyyy"

.env (Placeholder - NEVER commit this!)

bash
# .env - Amazon Ads API Credentials
# Fill these in when your API access is approved!

# Required - from LWA console
CLIENT_ID=your_client_id_here
CLIENT_SECRET=your_client_secret_here

# Required - from OAuth flow
REFRESH_TOKEN=your_refresh_token_here

# Required - from profiles API call
PROFILE_IDS=123456789  # Comma-separated if multiple

# Region: NA, EU, or FE
REGION=NA

# Optional settings
LOOK_BACK_DAYS=60
LOG_LEVEL=INFO

Part 4: Core Modules

src/auth.py

python
# src/auth.py
import os
from typing import Dict, List
from dotenv import load_dotenv
from loguru import logger

# Load environment variables
load_dotenv()

class Credentials:
    """Secure credential management"""

    # Load from environment
    CLIENT_ID = os.getenv("CLIENT_ID", "")
    CLIENT_SECRET = os.getenv("CLIENT_SECRET", "")
    REFRESH_TOKEN = os.getenv("REFRESH_TOKEN", "")
    PROFILE_IDS = os.getenv("PROFILE_IDS", "")
    REGION = os.getenv("REGION", "NA")

    @classmethod
    def validate(cls) -> bool:
        """Validate that credentials are configured"""
        missing = []

        if not cls.CLIENT_ID or "your_client_id" in cls.CLIENT_ID:
            missing.append("CLIENT_ID")
        if not cls.CLIENT_SECRET or "your_client_secret" in cls.CLIENT_SECRET:
            missing.append("CLIENT_SECRET")
        if not cls.REFRESH_TOKEN or "your_refresh_token" in cls.REFRESH_TOKEN:
            missing.append("REFRESH_TOKEN")
        if not cls.PROFILE_IDS:
            missing.append("PROFILE_IDS")

        if missing:
            logger.warning(f"Missing credentials: {', '.join(missing)}")
            logger.warning("Please update your .env file with actual credentials once approved.")
            return False
        return True

    @classmethod
    def get_profile_list(cls) -> List[int]:
        """Convert comma-separated profile IDs to list of integers"""
        if not cls.PROFILE_IDS:
            return []
        return [int(pid.strip()) for pid in cls.PROFILE_IDS.split(",")]

    @classmethod
    def get_api_config(cls) -> Dict:
        """Get complete API configuration for PyAirbyte"""
        return {
            "auth_type": "oauth2.0",
            "client_id": cls.CLIENT_ID,
            "client_secret": cls.CLIENT_SECRET,
            "refresh_token": cls.REFRESH_TOKEN,
            "region": cls.REGION,
            "profiles": cls.get_profile_list(),
            "start_date": "2024-01-01",  # Will be overridden in code
            "state_filter": ["enabled", "paused"],
            "look_back_window": 30,
            "report_wait_timeout": 30,
            "report_generation_max_retries": 5,
        }

src/api_client.py

python
# src/api_client.py
import airbyte as ab
import pandas as pd
from datetime import datetime, timedelta
from typing import Optional, List, Dict
from loguru import logger

from config.settings import Config, ReportType
from src.auth import Credentials

class AmazonAdsClient:
    """
    PyAirbyte-powered Amazon Ads API client
    Handles all data extraction with built-in retries and caching
    """

    def __init__(self):
        self.credentials = Credentials()
        self.source = None
        self.cache = None
        self.config = Config()

    def configure_source(self, start_date: Optional[datetime] = None):
        """
        Configure the PyAirbyte source connector
        This is where all the API complexity is handled for us!
        """
        if not self.credentials.validate():
            logger.warning("Using placeholder configuration for testing")

        # Get base config from credentials
        base_config = self.credentials.get_api_config()

        # Set start date (default to 60 days ago)
        if start_date is None:
            start_date = datetime.now() - timedelta(days=self.config.LOOK_BACK_WINDOW)

        base_config["start_date"] = start_date.strftime("%Y-%m-%d")

        # Add marketplace IDs based on region
        base_config["marketplace_ids"] = self.config.MARKETPLACE_IDS.get(
            self.config.DEFAULT_REGION,
            ["ATVPDKIKX0DER"]  # Default to US
        )

        logger.info(f"Configuring PyAirbyte source for region: {base_config['region']}")
        logger.info(f"Start date: {base_config['start_date']}")
        logger.info(f"Profiles: {base_config['profiles']}")

        try:
            # This single line replaces hundreds of lines of custom API code!
            self.source = ab.get_source(
                "source-amazon-ads",
                install_if_missing=True,
                config=base_config
            )

            # Test the connection
            check_result = self.source.check()
            if check_result.get("status") == "SUCCEEDED":
                logger.success("✅ Successfully connected to Amazon Ads API")
                return True
            else:
                logger.error(f"❌ Connection failed: {check_result}")
                return False

        except Exception as e:
            logger.error(f"❌ Failed to configure source: {str(e)}")
            return False

    def list_available_streams(self) -> List[str]:
        """List all available data streams from the API"""
        if not self.source:
            logger.error("Source not configured. Call configure_source() first.")
            return []

        streams = self.source.get_available_streams()
        logger.info(f"📊 Available streams: {len(streams)} total")
        for i, stream in enumerate(streams[:10]):  # Show first 10
            logger.info(f"   - {stream}")
        return streams

    def select_streams(self, stream_names: List[str]):
        """Select specific streams for extraction"""
        if not self.source:
            logger.error("Source not configured")
            return

        for stream in stream_names:
            self.source.select_stream(stream)
            logger.debug(f"Selected stream: {stream}")

        logger.success(f"✅ Selected {len(stream_names)} streams")

    def select_all_streams(self):
        """Select all available streams (useful for exploration)"""
        if not self.source:
            logger.error("Source not configured")
            return

        self.source.select_all_streams()
        logger.success("✅ Selected all available streams")

    def read_to_cache(self, cache_name: Optional[str] = None):
        """
        Read selected streams into local cache
        PyAirbyte handles all the pagination, rate limiting, and retries!
        """
        if not self.source:
            logger.error("Source not configured")
            return None

        try:
            # Use DuckDB cache (included with PyAirbyte)
            self.cache = ab.get_default_cache()

            logger.info("⏳ Reading data from API (this may take several minutes)...")
            logger.info("   PyAirbyte is handling pagination, rate limits, and retries automatically")

            # This is where the magic happens
            result = self.source.read(cache=self.cache)

            logger.success(f"✅ Data loaded into cache successfully")
            return self.cache

        except Exception as e:
            logger.error(f"❌ Failed to read data: {str(e)}")
            return None

    def get_stream_dataframe(self, stream_name: str) -> Optional[pd.DataFrame]:
        """
        Convert a cached stream to pandas DataFrame
        This is where we get the actual data for analysis
        """
        if not self.cache:
            logger.error("No cache available. Run read_to_cache() first.")
            return None

        try:
            df = self.cache[stream_name].to_pandas()
            logger.info(f"📈 Loaded stream '{stream_name}': {len(df)} rows, {len(df.columns)} columns")
            return df
        except Exception as e:
            logger.error(f"❌ Failed to load stream '{stream_name}': {str(e)}")
            return None

    def get_report_dataframe(self, report_type: ReportType) -> Optional[pd.DataFrame]:
        """
        Convenience method to get a specific report type
        Maps your report types to the correct stream names
        """
        stream_name = self.config.STREAMS.get(report_type)
        if not stream_name:
            logger.error(f"Unknown report type: {report_type}")
            return None

        return self.get_stream_dataframe(stream_name)

    def get_all_campaign_data(self) -> Dict[str, pd.DataFrame]:
        """
        Get all data needed for your Campaigns-9 workbook
        Returns a dictionary of DataFrames
        """
        result = {}

        # Get campaigns data
        campaigns_df = self.get_report_dataframe(ReportType.CAMPAIGNS)
        if campaigns_df is not None:
            result['campaigns'] = campaigns_df

        # Get targeting data
        targeting_df = self.get_report_dataframe(ReportType.TARGETING)
        if targeting_df is not None:
            result['targeting'] = targeting_df

        # Get search terms
        search_terms_df = self.get_report_dataframe(ReportType.SEARCH_TERMS)
        if search_terms_df is not None:
            result['search_terms'] = search_terms_df

        return result

src/excel_generator.py

python
# src/excel_generator.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Optional, List
from loguru import logger
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils.dataframe import dataframe_to_rows

from config.settings import Config

class ExcelGenerator:
    """
    Creates beautifully formatted Excel workbooks in your Campaigns-9 style
    Handles all the formatting, calculations, and structure you've designed
    """

    def __init__(self, output_dir: str = None):
        self.config = Config()
        self.output_dir = Path(output_dir or self.config.OUTPUT_DIR)
        self.output_dir.mkdir(parents=True, exist_ok=True)

        # Excel formatting styles
        self.header_font = Font(bold=True, color="FFFFFF")
        self.header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid")
        self.date_fill = PatternFill(start_color="D3D3D3", end_color="D3D3D3", fill_type="solid")  # Gray for date rows
        self.todo_fill = PatternFill(start_color="FFE699", end_color="FFE699", fill_type="solid")  # Yellow for TODO

        self.thin_border = Border(
            left=Side(style='thin'),
            right=Side(style='thin'),
            top=Side(style='thin'),
            bottom=Side(style='thin')
        )

    def generate_filename(self) -> str:
        """Generate filename in your format: Ads_sfq-16_YYYYMMDD.xlsx"""
        today = datetime.now().strftime(self.config.FILE_DATE_FORMAT)
        return f"Ads_sfq-16_{today}.xlsx"

    def create_campaigns_sheet(self, df: pd.DataFrame, writer: pd.ExcelWriter, sheet_name: str):
        """
        Create the main Campaigns-9 sheet with your exact structure:
        - Date markers as separators
        - Lifetime and Last 7 days sections
        - Calculated metrics (CTR, CPC, ACOS, ROAS)
        - Summary rows with totals
        - TODO list section
        """

        # Create a copy to work with
        campaigns_data = df.copy() if df is not None else pd.DataFrame()

        # If no real data, create placeholder with your structure
        if campaigns_data.empty:
            campaigns_data = self._create_placeholder_campaigns()

        # Sort by Orders descending as per your instructions
        if 'Orders' in campaigns_data.columns:
            campaigns_data = campaigns_data.sort_values('Orders', ascending=False)

        # Write to Excel
        campaigns_data.to_excel(writer, sheet_name=sheet_name, index=False, startrow=2)

        # Get workbook and worksheet for formatting
        workbook = writer.book
        worksheet = writer.sheets[sheet_name]

        # Add your header with date markers and sections
        self._add_campaigns_header(worksheet, workbook)

        # Format the data
        self._format_campaigns_sheet(worksheet, campaigns_data)

        # Add TODO list section at the bottom
        self._add_todo_section(worksheet, workbook, start_row=len(campaigns_data) + 10)

        logger.success(f"✅ Created '{sheet_name}' sheet with {len(campaigns_data)} rows")

    def _create_placeholder_campaigns(self) -> pd.DataFrame:
        """Create placeholder data matching your Campaigns-9 structure"""

        placeholder_data = {
            'State': ['ENABLED', 'ENABLED', 'ENABLED'],
            'Campaigns': [
                'Sound Rise II - alarm clock',
                'Sound Rise II - soundfreaq',
                'Sound Rise II - category 11'
            ],
            'Status': ['CAMPAIGN_STATUS_ENABLED'] * 3,
            'Start date': ['09/04/2024', '06/21/2024', '11/15/2024'],
            'End date': ['', '', ''],
            'Budget (converted)': ['$30.00', '$15.00', '$30.00'],
            'Budget': [30.00, 15.00, 30.00],
            'Clicks': [7052, 904, 4417],
            'CTR': [0.004, 0.0131, 0.0071],
            'Spend (converted)': [8319.69, 953.13, 4068.21],
            'Spend': [8319.69, 953.13, 4068.21],
            'CPC (converted)': [1.18, 1.05, 0.92],
            'CPC': [1.18, 1.05, 0.92],
            'Orders': [231, 116, 111],
            'Sales (converted)': [20779.81, 13415.83, 9592.97],
            'Sales': [20779.81, 13415.83, 9592.97],
            'ACOS': [0.4004, 0.071, 0.4241],
            'ROAS': [2.5, 14.08, 2.36],
        }

        df = pd.DataFrame(placeholder_data)

        # Calculate derived metrics
        df['CTR'] = df['Clicks'] / 1_000_000  # Placeholder calculation
        df['ACOS'] = df['Spend'] / df['Sales']
        df['ROAS'] = df['Sales'] / df['Spend']

        return df

    def _add_campaigns_header(self, worksheet, workbook):
        """Add the structured header with date markers"""

        # Add date marker row (like your "2025-09-22 00:00:00" rows)
        date_row = 1
        date_cell = worksheet.cell(row=date_row, column=1)
        date_cell.value = datetime.now().strftime("%Y-%m-%d 00:00:00")
        date_cell.font = Font(bold=True)
        date_cell.fill = self.date_fill

        # Add column headers row
        header_row = 3
        headers = ['State', 'Campaigns', 'Status', 'Start date', 'End date',
                   'Budget (converted)', 'Budget', 'Clicks', 'CTR',
                   'Spend (converted)', 'Spend', 'CPC (converted)', 'CPC',
                   'Orders', 'Sales (converted)', 'Sales', 'ACOS', 'ROAS']

        for col, header in enumerate(headers, start=1):
            cell = worksheet.cell(row=header_row, column=col)
            cell.value = header
            cell.font = self.header_font
            cell.fill = self.header_fill
            cell.alignment = Alignment(horizontal='center')
            cell.border = self.thin_border

    def _format_campaigns_sheet(self, worksheet, df: pd.DataFrame):
        """Apply formatting to the campaigns sheet"""

        # Format currency columns
        currency_cols = ['Budget', 'Spend', 'Sales', 'Budget (converted)',
                         'Spend (converted)', 'Sales (converted)']
        for col_name in currency_cols:
            if col_name in df.columns:
                col_idx = df.columns.get_loc(col_name) + 1
                for row in range(4, len(df) + 4):
                    cell = worksheet.cell(row=row, column=col_idx)
                    cell.number_format = '"$"#,##0.00'

        # Format percentage columns
        pct_cols = ['CTR', 'ACOS']
        for col_name in pct_cols:
            if col_name in df.columns:
                col_idx = df.columns.get_loc(col_name) + 1
                for row in range(4, len(df) + 4):
                    cell = worksheet.cell(row=row, column=col_idx)
                    cell.number_format = '0.0000%'

        # Format CPC columns
        cpc_cols = ['CPC', 'CPC (converted)']
        for col_name in cpc_cols:
            if col_name in df.columns:
                col_idx = df.columns.get_loc(col_name) + 1
                for row in range(4, len(df) + 4):
                    cell = worksheet.cell(row=row, column=col_idx)
                    cell.number_format = '"$"#,##0.00'

        # Add borders to all data cells
        for row in range(4, len(df) + 4):
            for col in range(1, len(df.columns) + 1):
                cell = worksheet.cell(row=row, column=col)
                cell.border = self.thin_border

        # Auto-adjust column widths
        for col in worksheet.columns:
            max_length = 0
            column = col[0].column_letter
            for cell in col:
                try:
                    if len(str(cell.value)) > max_length:
                        max_length = len(str(cell.value))
                except:
                    pass
            adjusted_width = min(max_length + 2, 50)
            worksheet.column_dimensions[column].width = adjusted_width

    def _add_todo_section(self, worksheet, workbook, start_row: int):
        """Add the TODO list section at the bottom"""

        # TODO header
        todo_header = worksheet.cell(row=start_row, column=1)
        todo_header.value = "TO DO LIST"
        todo_header.font = Font(bold=True, size=12)
        todo_header.fill = self.todo_fill

        # Add your standard TODO items
        todos = [
            "1. reduce daily budget (category 11 & 2026) to $10/day, until we have a better approach",
            "2. category 2026 (bid adjustments - use historical cpc from category 11; negative keywords - add; negative products - add; wood skus - kick out)",
            "3. shall we increase spend (by 1. adding more asin targets; 2. increasing bids?)",
            "4. Please take actions and let's discuss once you are done."
        ]

        for i, todo in enumerate(todos, start=1):
            cell = worksheet.cell(row=start_row + i, column=1)
            cell.value = todo
            cell.font = Font(italic=True)

    def create_targeting_sheet(self, df: pd.DataFrame, writer: pd.ExcelWriter, sheet_name: str):
        """Create the Targeting sheet with keyword performance data"""

        if df is not None and not df.empty:
            # Process targeting data
            df.to_excel(writer, sheet_name=sheet_name, index=False)
            logger.success(f"✅ Created '{sheet_name}' sheet with {len(df)} rows")
        else:
            # Create placeholder
            placeholder = pd.DataFrame({
                'Keyword': ['bose clock radio', 'sony clock radio', 'ihome clock radio'],
                'Match type': ['PHRASE'] * 3,
                'Status': ['ENABLED'] * 3,
                'Impressions': [45630, 20549, 25468],
                'Clicks': [158, 82, 59],
                'CTR': [0.0035, 0.004, 0.0023],
                'Spend(USD)': [210.42, 90.49, 65.94],
                'Orders': [14, 7, 7],
                'Sales(USD)': [1249.86, 569.93, 679.93],
                'ACOS': [0.1684, 0.1588, 0.097],
            })
            placeholder.to_excel(writer, sheet_name=sheet_name, index=False)
            logger.warning(f"⚠️ Created placeholder for '{sheet_name}'")

    def create_bid_adjustments_sheet(self, writer: pd.ExcelWriter):
        """Create the Bid adjustments sheet with placement data"""

        # Create placeholder matching your structure
        data = {
            'Placements': ['PLACEMENT_GROUP_TOP', 'PLACEMENT_GROUP_OTHER', 'PLACEMENT_GROUP_DETAIL_PAGE'],
            'Campaign bidding strategy': ['Fixed bids'] * 3,
            'Bid adjustment': [0, 0, 0],
            'Impressions': [1273, 876, 19654],
            'Clicks': [170, 22, 53],
            'CTR': [0.1335, 0.0251, 0.0027],
            'Spend(USD)': [115.3, 20.76, 70.61],
            'Orders': [28, 1, 9],
            'Sales(USD)': [2589.69, 79.99, 839.91],
            'ACOS': [0.0445, 0.2595, 0.0841],
        }

        df = pd.DataFrame(data)
        df.to_excel(writer, sheet_name='Bid adjustments', index=False)
        logger.success("✅ Created 'Bid adjustments' sheet")

    def create_negative_keywords_sheet(self, writer: pd.ExcelWriter):
        """Create the Negative keywords tracking sheet"""

        # Sample data from your file
        data = {
            'Campaign': ['clock radio 241128', 'category 11', 'local records'],
            'Negative Keywords': ['airplay', 'dab', 'alexa'],
            'Date Added': [datetime.now().strftime("%Y-%m-%d")] * 3,
        }

        df = pd.DataFrame(data)
        df.to_excel(writer, sheet_name='Negative keywords', index=False)
        logger.success("✅ Created 'Negative keywords' sheet")

    def generate_workbook(self, data: Dict[str, pd.DataFrame]) -> str:
        """
        Generate the complete Excel workbook with all sheets
        This is the main method that creates your final report
        """
        filename = self.generate_filename()
        filepath = self.output_dir / filename

        logger.info(f"📗 Generating workbook: {filename}")

        with pd.ExcelWriter(filepath, engine='openpyxl') as writer:

            # 1. Main Campaigns-9 sheet
            campaigns_df = data.get('campaigns')
            self.create_campaigns_sheet(campaigns_df, writer, 'Campaigns-9')

            # 2. Targeting sheet
            targeting_df = data.get('targeting')
            self.create_targeting_sheet(targeting_df, writer, 'Targeting')

            # 3. Bid adjustments sheet
            self.create_bid_adjustments_sheet(writer)

            # 4. Negative keywords sheet
            self.create_negative_keywords_sheet(writer)

            # 5. Category search terms sheet (if available)
            search_terms_df = data.get('search_terms')
            if search_terms_df is not None:
                search_terms_df.to_excel(writer, sheet_name='Category_search_terms', index=False)

            logger.success(f"✅ Workbook generation complete: {filepath}")

        return str(filepath)

src/logger_setup.py

python
# src/logger_setup.py
import sys
from pathlib import Path
from loguru import logger

def setup_logger(log_dir: str = "logs"):
    """Configure loguru for beautiful logging"""

    # Remove default handler
    logger.remove()

    # Add console handler with colors
    logger.add(
        sys.stderr,
        format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
        level="INFO",
        colorize=True
    )

    # Add file handler
    log_path = Path(log_dir)
    log_path.mkdir(exist_ok=True)

    logger.add(
        log_path / "amazon_ads_{time:YYYY-MM-DD}.log",
        format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
        level="DEBUG",
        rotation="1 day",
        retention="30 days",
        compression="zip"
    )

    return logger

Part 5: Main Orchestration Script

scripts/run_weekly.py

python
#!/usr/bin/env python3
"""
Amazon Ads Weekly Data Automation
Run this script weekly to fetch campaign data and update your Campaigns-9 workbook
"""

import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import click
from datetime import datetime, timedelta
from pathlib import Path
from loguru import logger

from src.api_client import AmazonAdsClient
from src.excel_generator import ExcelGenerator
from src.logger_setup import setup_logger
from src.auth import Credentials
from config.settings import Config, ReportType

# Setup logging
logger = setup_logger()

@click.command()
@click.option('--start-date', default=None, help='Start date for data (YYYY-MM-DD)')
@click.option('--end-date', default=None, help='End date for data (YYYY-MM-DD)')
@click.option('--test', is_flag=True, help='Run in test mode (no API calls)')
@click.option('--verbose', is_flag=True, help='Enable verbose logging')
def main(start_date, end_date, test, verbose):
    """
    🚀 Amazon Ads Weekly Automation

    Fetches campaign data from Amazon Ads API and generates your Campaigns-9 Excel workbook.
    Run this weekly (e.g., every Friday) to keep your reports up to date.
    """

    if verbose:
        logger.remove()
        logger.add(sys.stderr, level="DEBUG")

    logger.info("=" * 60)
    logger.info("📊 AMAZON ADS WEEKLY AUTOMATION STARTING")
    logger.info("=" * 60)

    # Parse dates
    start = None
    end = None

    if start_date:
        start = datetime.strptime(start_date, "%Y-%m-%d")
    if end_date:
        end = datetime.strptime(end_date, "%Y-%m-%d")

    # Default to last 7 days
    if not end:
        end = datetime.now() - timedelta(days=1)  # Yesterday
    if not start:
        start = end - timedelta(days=7)  # Last 7 days

    logger.info(f"📅 Report period: {start.strftime('%Y-%m-%d')} to {end.strftime('%Y-%m-%d')}")

    if test:
        logger.info("🧪 Running in TEST MODE - generating sample workbook")
        _run_test_mode(start, end)
        return

    # Check credentials
    if not Credentials.validate():
        logger.error("❌ Credentials not properly configured")
        logger.info("   Please update your .env file with actual credentials once approved.")
        logger.info("   For now, run with --test flag to see the workflow.")
        return

    # Step 1: Initialize API client
    logger.info("\n🔌 Step 1: Initializing Amazon Ads API client...")
    client = AmazonAdsClient()

    if not client.configure_source(start_date=start):
        logger.error("❌ Failed to configure API client")
        return

    # Step 2: Select relevant streams
    logger.info("\n🔍 Step 2: Selecting data streams...")

    # Map your report types to stream names
    streams_to_select = [
        Config.STREAMS[ReportType.CAMPAIGNS],
        Config.STREAMS[ReportType.TARGETING],
        Config.STREAMS[ReportType.SEARCH_TERMS],
        "sponsored_product_ad_groups",
        "profiles",
    ]

    client.select_streams(streams_to_select)

    # Step 3: Extract data (PyAirbyte handles all the complexity!)
    logger.info("\n📥 Step 3: Extracting data from Amazon Ads API...")
    logger.info("   ⏱️  This may take 5-10 minutes as reports generate")
    logger.info("   🤖 PyAirbyte is handling pagination, rate limits, and retries automatically")

    cache = client.read_to_cache()
    if not cache:
        logger.error("❌ Failed to extract data")
        return

    # Step 4: Load data into DataFrames
    logger.info("\n🔄 Step 4: Loading data into DataFrames...")

    data = {}

    # Get campaigns data
    campaigns_df = client.get_stream_dataframe(Config.STREAMS[ReportType.CAMPAIGNS])
    if campaigns_df is not None:
        logger.info(f"   Campaigns: {len(campaigns_df)} rows")
        data['campaigns'] = campaigns_df

    # Get targeting data
    targeting_df = client.get_stream_dataframe(Config.STREAMS[ReportType.TARGETING])
    if targeting_df is not None:
        logger.info(f"   Targeting: {len(targeting_df)} rows")
        data['targeting'] = targeting_df

    # Get search terms
    search_df = client.get_stream_dataframe(Config.STREAMS[ReportType.SEARCH_TERMS])
    if search_df is not None:
        logger.info(f"   Search terms: {len(search_df)} rows")
        data['search_terms'] = search_df

    # Step 5: Generate Excel workbook
    logger.info("\n📗 Step 5: Generating Campaigns-9 Excel workbook...")

    excel_gen = ExcelGenerator()
    output_file = excel_gen.generate_workbook(data)

    # Step 6: Log summary
    logger.info("\n" + "=" * 60)
    logger.success("✅ WEEKLY AUTOMATION COMPLETED SUCCESSFULLY")
    logger.info("=" * 60)
    logger.info(f"📁 Output file: {output_file}")

    # Calculate some summary stats
    if campaigns_df is not None:
        total_spend = campaigns_df['cost'].sum() if 'cost' in campaigns_df.columns else 0
        total_orders = campaigns_df['purchases'].sum() if 'purchases' in campaigns_df.columns else 0
        total_sales = campaigns_df['sales'].sum() if 'sales' in campaigns_df.columns else 0

        logger.info("\n📊 Weekly Summary:")
        logger.info(f"   Total Spend: ${total_spend:,.2f}")
        logger.info(f"   Total Orders: {total_orders}")
        logger.info(f"   Total Sales: ${total_sales:,.2f}")
        if total_spend > 0:
            logger.info(f"   Overall ACOS: {(total_spend/total_sales)*100:.2f}%")

    logger.info("\n🏁 Done! Ready for your analysis and TODO list.")

def _run_test_mode(start_date: datetime, end_date: datetime):
    """Run in test mode - generates sample workbook without API calls"""

    logger.info("🧪 Creating sample workbook with placeholder data...")

    excel_gen = ExcelGenerator()

    # Create empty data dict - generator will use placeholders
    data = {}

    output_file = excel_gen.generate_workbook(data)

    logger.success(f"✅ Test workbook created: {output_file}")
    logger.info("   When your API credentials are ready, remove the --test flag to get real data.")

if __name__ == "__main__":
    main()

scripts/schedule_weekly.py (Optional - for automated scheduling)

python
#!/usr/bin/env python3
"""
Scheduler for weekly automation
Run this as a background process or use with cron/Task Scheduler
"""

import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import schedule
import time
from datetime import datetime
from loguru import logger

from src.logger_setup import setup_logger

logger = setup_logger()

def run_weekly_job():
    """Wrapper function to run the main script"""
    logger.info("🕒 Scheduled weekly job triggered")

    # Import here to avoid circular imports
    import subprocess

    # Run the main script
    result = subprocess.run(
        [sys.executable, "scripts/run_weekly.py"],
        capture_output=True,
        text=True
    )

    if result.returncode == 0:
        logger.success("✅ Scheduled job completed successfully")
        logger.debug(f"Output: {result.stdout}")
    else:
        logger.error(f"❌ Scheduled job failed: {result.stderr}")

def main():
    """Set up and run the scheduler"""

    logger.info("⏰ Starting Amazon Ads weekly scheduler")

    # Schedule for every Friday at 9:00 AM
    schedule.every().friday.at("09:00").do(run_weekly_job)

    # Also run immediately on startup (for testing)
    # schedule.every(1).minutes.do(run_weekly_job)  # For testing every minute

    logger.info("Scheduler running. Next run: Friday at 09:00")
    logger.info("Press Ctrl+C to stop")

    try:
        while True:
            schedule.run_pending()
            time.sleep(60)  # Check every minute
    except KeyboardInterrupt:
        logger.info("🛑 Scheduler stopped by user")

if __name__ == "__main__":
    main()

Part 6: Testing Scripts

tests/test_placeholder.py

python
#!/usr/bin/env python3
"""
Test script to verify everything works without API credentials
"""

import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from src.excel_generator import ExcelGenerator
from src.logger_setup import setup_logger

logger = setup_logger()

def test_placeholder_generation():
    """Test that we can generate a placeholder workbook"""

    logger.info("🧪 Testing placeholder workbook generation")

    excel_gen = ExcelGenerator()
    output_file = excel_gen.generate_workbook({})

    logger.success(f"✅ Test passed! Workbook created: {output_file}")
    return True

if __name__ == "__main__":
    test_placeholder_generation()

Part 7: Make Scripts Executable

bash
# Make the Python scripts executable
chmod +x scripts/run_weekly.py
chmod +x scripts/schedule_weekly.py
chmod +x tests/test_placeholder.py

# Create a convenience runner script
cat > run.sh << 'EOF'
#!/bin/bash
source venv/bin/activate
python scripts/run_weekly.py "$@"
EOF

chmod +x run.sh

Part 8: README and Documentation

README.md

markdown
# Amazon Ads Weekly Automation

## Overview
This system automates the weekly collection of Amazon Advertising data and generates a beautifully formatted Excel workbook in your "Campaigns-9" style.

## Features
- ✅ PyAirbyte-powered - handles all API complexity automatically
- ✅ Secure credential management with .env
- ✅ Beautiful logging with loguru
- ✅ Campaigns-9 style workbook with proper formatting
- ✅ TODO list section preserved
- ✅ Test mode for pre-approval development
- ✅ Scheduled execution (optional)

## Quick Start

### 1. Setup Environment
```bash
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt

2. Configure Credentials

Edit .env with your API credentials once approved.

3. Test Without Credentials

bash
python scripts/run_weekly.py --test

4. Run Real Automation

bash
python scripts/run_weekly.py

5. Schedule Weekly Runs

bash
# Option 1: Use the built-in scheduler
python scripts/schedule_weekly.py

# Option 2: Use cron (macOS/Linux)
# Add to crontab: 0 9 * * 5 cd /path/to/project && ./run.sh

# Option 3: Use Task Scheduler (Windows)

File Structure

amazon-ads-automation/
├── .env                    # Your credentials (never commit!)
├── requirements.txt        # Dependencies
├── config/
│   └── settings.py         # Configuration settings
├── src/
│   ├── auth.py             # Credential management
│   ├── api_client.py       # PyAirbyte client
│   ├── excel_generator.py  # Excel formatting
│   └── logger_setup.py     # Logging setup
├── scripts/
│   ├── run_weekly.py       # Main automation script
│   └── schedule_weekly.py  # Optional scheduler
├── tests/
│   └── test_placeholder.py # Pre-approval testing
└── output/
    └── reports/            # Generated Excel files

What PyAirbyte Handles For You

  • 🔄 OAuth2 authentication and token refresh
  • ⏱️ Report generation polling (up to 30 minutes)
  • 📊 Pagination of large result sets
  • 🚦 Rate limiting and retries
  • 💾 Automatic caching with DuckDB
  • 🔌 20+ pre-built data streams

### Final Step: Test It Now!

Even without API credentials, you can verify everything works:

```bash
# 1. Activate virtual environment
source venv_312/bin/activate

# 2. Run the test
python tests/test_placeholder.py

# 3. Run the main script in test mode
python scripts/run_weekly.py --test

# 4. Check the output
ls -la output/reports/

You should see a beautifully formatted Excel file with your Campaigns-9 structure, complete with TODO lists and proper formatting!

What's Next?

Once your API credentials are approved:

  1. Update .env with your actual CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, and PROFILE_IDS
  2. Run python scripts/run_weekly.py to get real data
  3. The script will automatically:
    • Connect to Amazon Ads API
    • Request the last 7 days of data
    • Wait for reports to generate (PyAirbyte handles this)
    • Download and transform the data
    • Generate your Campaigns-9 Excel workbook

The beauty of PyAirbyte is that you don't need to write any code for the API complexities - it's all handled for you!

bash
curl -X POST \
  https://api.amazon.com/auth/o2/token \
  -d "grant_type=refresh_token" \
  -d "refresh_token=Atzr|IwEBIFwUoM8wV_K5Sq-Yb61pgsFmk-7MrK9l1SlxHrLmZBae1yQEA8AniwcpqNDFtzxRDzyuAoUjH54hiiu5hmWqewT99pkoZpw0Z0hKP4pVH4Sqp4ZvUgo9N4Gmw66oN6dnebYx6UNH9JrPlYUL5rLetX9x2HTxOydKXR160gcvn_ONfkv_1rZKaIfuvxvovSXvQoPrwCdcbO_DkN7_qgu-uLPRapFNXkmY2XgV1m_2X14Tbh2PBOzVvC2-HxkFFQhKdNLRcpuSS6nMEMm9AgXq2dFAVwLBIEDd0fXXeQ-8m9aDgcXF7KYaftueQyXdWk9fF6Xth3vzMjd42JT-Wv-QIJZiZptAg99O9kXcEsmqT6Rslk50lxELO6ovljY0Df4_mLle2dqjZWx18SSJ2_s8tOV01KL1EA5L4IaavGzduAUlOUwP1LZ5SpjQSyhamptyeMJ7A_nJ76nDtXN6q99BDAwN9xE_FExvEVRXZaFXnuz8eXe43DbETWxVvpDTj1PYhcqOFIqmzChSTPIzTOnVTH3-y1M8t7ct5nO4-OEdw8l9Tw" \
  -d "client_id=amzn1.application-oa2-client.c73aae635e0644239fd1860292f93785" \
  -d "client_secret=amzn1.oa2-cs.v1.321e860063b33e346f59453efde2413c2476a13da379702cea972d662708d1cb"
最近更新