Building a Real-time Economic Indicator (REI) Tracker with SearchApi and Python

スポンサーリンク

1. Introduction: The Problem of Latency in Official Statistics

Official economic metrics, such as the Consumer Price Index (CPI), are foundational to understanding macro trends. However, they suffer from a structural defect: latency. By the time government statistics are officially released, the market has already moved on.

For developers and analysts, bridging this informational gap requires a proactive engineering approach to data collection. This project aims to build a Real-time Economic Indicator (REI) Tracker. By monitoring the digital shelf prices of essential goods in a specific metropolitan area (Osaka, Japan), we can identify inflationary or deflationary trends weeks before they appear in official reports.

2. Bypassing Infrastructure Hurdles with SearchApi

Extracting high-quality, high-frequency data from search engines and shopping platforms is notoriously difficult due to sophisticated anti-bot protections, IP blocks, and CAPTCHAs.

SearchApi serves as the critical infrastructure for this project. By handling proxy rotation, browser rendering, and CAPTCHA solving seamlessly, it allows us to bypass the non-essential mechanics of web scraping and focus entirely on the logic of data interpretation.

3. Complete Source Code

Below is the robust Python implementation that covers everything from data extraction via SearchApi to data persistence in CSV format using Pandas, featuring duplication prevention for same-day runs.

import requests
import statistics
import pandas as pd
from datetime import datetime, date
import os
import re
import logging
from pathlib import Path
from dataclasses import dataclass
from typing import List, Dict, Optional
from dotenv import load_dotenv

# ====================== setting ======================
load_dotenv()

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s | %(levelname)s | %(message)s',
    datefmt='%H:%M:%S'
)
logger = logging.getLogger(__name__)

API_KEY = os.getenv("SEARCHAPI_API_KEY")
if not API_KEY:
    raise ValueError("SEARCHAPI_API_KEY is not set in the .env file!")

@dataclass
class Config:
    target_items: List[str] = None
    location: str = "Osaka, Osaka, Japan"
    gl: str = "jp"
    hl: str = "ja"
    num_results: int = 20
    min_samples: int = 3

# Tracked products
config = Config(
    target_items=[
        "Egg 10-pack",
        "Rice 5kg",
        "Tissue paper 5-pack",
        "Gasoline price",
        "iPhone 15 128GB",
    ]
)

class EconomicIndicatorTracker:
    def __init__(self, api_key: str, config: Config):
        self.api_key = api_key
        self.config = config
        self.endpoint = "https://www.searchapi.io/api/v1/search"
        self.session = requests.Session()

    @staticmethod
    def parse_price(price_str: str) -> Optional[int]:
        """Securely convert price strings to numbers"""
        if not price_str:
            return None
        cleaned = str(price_str).replace('円', '').replace('¥', '').replace(' ', '')
        cleaned = re.sub(r'[^\d.,]', '', cleaned)
        cleaned = cleaned.replace(',', '')
        try:
            return int(float(cleaned))
        except ValueError:
            return None

    def get_market_price(self, query: str) -> Optional[Dict]:
        """Price data is retrieved from Google Shopping"""
        params = {
            "engine": "google_shopping",
            "q": query,
            "location": self.config.location,
            "api_key": self.api_key,
            "gl": self.config.gl,
            "hl": self.config.hl,
            "num": self.config.num_results,
        }

        try:
            response = self.session.get(self.endpoint, params=params, timeout=20)
            response.raise_for_status()
            data = response.json()

            prices = []
            for item in data.get("shopping_results", []):
                price_str = item.get("price") or item.get("extracted_price")
                if price_str:
                    parsed = self.parse_price(price_str)
                    if parsed and parsed > 0:
                        prices.append(parsed)

            if len(prices) < self.config.min_samples:
                logger.warning(f"There are too few valid samples for {query} ({len(prices)} items).")
                return None

            return {
                "item": query,
                "date": date.today().isoformat(),
                "timestamp": datetime.now().isoformat(),
                "median_price": round(statistics.median(prices)),
                "sample_count": len(prices),
                "min_price": min(prices),
                "max_price": max(prices)
            }
        except Exception as e:
            logger.error(f"Data acquisition error {query}: {e}")
        return None

def main():
    tracker = EconomicIndicatorTracker(API_KEY, config)
    results = []

    print(f"--- Real-world economic survey begins in the Osaka area ({date.today().isoformat()}) ---")
    
    for item in config.target_items:
        logger.info(f"Fetching: {item}...")
        stats = tracker.get_market_price(item)
        if stats:
            results.append(stats)
            logger.info(f"   → median: ¥{stats['median_price']:,} ({stats['sample_count']} 件)")

    if results:
        df_new = pd.DataFrame(results)
        csv_file = f"economic_indicator_{datetime.now().strftime('%Y%m')}.csv"
        
        # CSV persistence logic (preventing duplicate entries for the same day)
        if Path(csv_file).exists():
            df_existing = pd.read_csv(csv_file)
            df_existing = df_existing[df_existing['date'] != date.today().isoformat()]
            df_combined = pd.concat([df_existing, df_new], ignore_index=True)
        else:
            df_combined = df_new

        df_combined.to_csv(csv_file, index=False)
        logger.info(f"💾 Saved to CSV: {csv_file}")
        
        print("\n" + "="*50)
        print("### Market Price Summary ###")
        print(df_new[['item', 'median_price', 'sample_count']].to_string(index=False))
        print("="*50)
    else:
        logger.error("No data was collected.")

if __name__ == "__main__":
    main()

4. Analytical Methodology

  • Median Pricing: By prioritizing the Median over the Mean, we effectively eliminate the noise of extreme outliers (such as steep discounts, used items, or shipping entry errors) to find the true “market middle”.

  • Localized Context: Setting the geo-location specifically to Osaka allows the script to capture regional supply chain shifts and local logistics impacts.

  • Data Persistence: Automated monthly CSV updates generate a clean, reliable historical baseline for robust Month-over-Month (MoM) and Year-over-Year (YoY) economic analysis.

5. Open Source Repository & Quick Start

The complete project files and deployment guide are available on GitHub.

  1. Clone & Install: Clone the repository and run pip install -r requirements.txt.

  2. Get SearchApi Key: Sign up at SearchApi.io and copy your API key from the developer dashboard.

  3. Environment Setup: Create a .env file in the root folder and add your key: SEARCHAPI_API_KEY=your_api_key_here.

  4. Run: Execute python rei_tracker.py to start collecting real-time market insights and generating your localized CSV datasets.

👉 https://github.com/kobayashikazu/rei-tracker-osaka

コメント

タイトルとURLをコピーしました