MENU

【Python】競馬情報のスクレイピング レース結果の取得

はじめに

競馬のデータ分析をテーマに、オッズやレース結果、騎手の成績などの情報を効率的に取得します。

目次

range_loader.py

ソースコード

from datetime import datetime

from .UI_helper import print_message


def range_loader(y1, m1, y2, m2, master=None):
    """
    年・月の範囲を取得し、有効な範囲に修正した後、指定範囲の [year, month] のリストを生成する。
    ただし、2008年未満 または 現在より未来の年月は `None` を返す。

    :param y1: 開始年
    :param m1: 開始月
    :param y2: 終了年
    :param m2: 終了月
    :param master: GUIのメッセージ表示オブジェクト(デフォルト: None)
    :return: 指定範囲の [year, month] のリスト または `None`(無効な入力)
    """
    # 入力値のバリデーション(非数値なら `None`)
    if any(not str(value).isdigit() for value in [y1, m1, y2, m2]):
        print_message(
            master, "エラー: 年または月の入力が無効です。整数を入力してください。"
        )
        return None

    # `int` に変換
    y1, m1, y2, m2 = int(y1), int(m1), int(y2), int(m2)

    # 現在の年月を取得
    now = datetime.now()

    # 年のバリデーション
    y1_valid, y2_valid = constrain_year(y1, now), constrain_year(y2, now)
    if y1_valid is None or y2_valid is None:  # 2008未満 or 現在より未来
        print_message(
            master, "エラー: 年は 2008年 以上 現在年以下 で入力してください。"
        )
        return None

    # 月のバリデーション
    m1, m2 = constrain_month(m1), constrain_month(m2)

    # 未来の年月のチェック
    m1_valid, m2_valid = (
        constrain_latest_month(y1_valid, m1, now),
        constrain_latest_month(y2_valid, m2, now),
    )
    if m1_valid is None or m2_valid is None:  # 未来の年月が指定された場合
        print_message(
            master,
            "エラー: 未来の年月は指定できません。現在以前の年月を選択してください。",
        )
        return None

    # 年と月の順序を正規化
    y1, y2 = min(y1_valid, y2_valid), max(y1_valid, y2_valid)
    if y1 == y2:
        m1, m2 = min(m1_valid, m2_valid), max(m1_valid, m2_valid)

    # 年・月のリストを生成
    ym_list = [
        [y, m]
        for y in range(y1, y2 + 1)
        for m in range(1, 13)
        if (y != y1 or m >= m1) and (y != y2 or m <= m2)
    ]

    # データ範囲をメッセージで表示(同じ年月の場合は表示しない)
    if ym_list:
        start_year, start_month = ym_list[0]
        end_year, end_month = ym_list[-1]
        if start_year != end_year or start_month != end_month:
            print_message(
                master,
                f"{start_year}年{start_month}月から{end_year}年{end_month}月までのデータを取得します。",
            )
    return ym_list


def constrain_year(year, now):
    """
    指定された年を 2008年 から 現在年 の範囲に制限する。
    2008年未満 または 現在年より未来の年は `None` を返す。

    :param year: 入力された年
    :param now: 現在の datetime オブジェクト
    :return: 有効な年 または `None`
    """
    if not isinstance(year, int) or year < 2008 or year > now.year:
        return None
    return year


def constrain_month(month):
    """
    指定された月を 1 から 12 の範囲に制限する。

    :param month: 入力された月
    :return: 1〜12 の範囲の月
    """
    return max(1, min(month, 12))


def constrain_latest_month(year, month, now):
    """
    指定された年が現在年より未来の場合、または現在年かつ現在月より未来の場合 `None` を返す。
    そうでない場合はそのままの月を返す。

    :param year: 入力された年
    :param month: 入力された月
    :param now: 現在の datetime オブジェクト
    :return: 制限された月 または `None`(未来の場合)
    """
    if year > now.year or (year == now.year and month > now.month):
        return None
    return month

range_loader の概要

  • 入力された年・月が整数であるかを検証。
  • 2008年から現在までの範囲に収まるように年を制限。
  • 1〜12の範囲に収まるように月を制限。
  • 未来の年月を選択できないように制限。
  • print_message() を用いて、エラーメッセージやデータ取得範囲の通知をGUIに出力。

range_loader() 関数の動作

  • y1, m1, y2, m2 を数値として検証し、整数に変換。
  • constrain_year() 関数を用いて、2008年から現在年の範囲に制限。
  • constrain_month() を用いて、1〜12の範囲に制限。
  • constrain_latest_month() を使用し、未来の年月が選択されないように制御。
  • 年・月の範囲を正規化し、適切なリストを生成。

各バリデーション関数の説明

constrain_year(year, now)

  • 入力された年を2008年から現在年までの範囲に制限。
  • 範囲外の年が入力された場合は None を返す。

constrain_month(month)

  • 入力された月を1〜12の範囲に制限。
  • 1未満や12を超える値が指定された場合、自動的に補正。

constrain_latest_month(year, month, now)

  • 指定された年・月が未来の日付にならないように制限。
  • 未来の年月が指定された場合は None を返す。

date_scraper.py

ソースコード

import re

import requests
from bs4 import BeautifulSoup
from fake_useragent import UserAgent

from Libs.Constants import UrlPaths

from .UI_helper import print_message, update_progress


def date_scraper(ym_list, master=None):
    """
    レース開催日一覧を取得する関数。

    :param ym_list: [(year, month)] のリスト
    :param master: GUIオブジェクト(プログレスバー更新用)
    :return: 開催日一覧(ソート済み)
    """
    print_message(master, "開催日を取得しています...... ", line_brake=False)

    date_list = []
    total_ym = len(ym_list)
    user_agent = {"User-Agent": UserAgent().chrome}  # ユーザーエージェントの初期化

    for count, (year, month) in enumerate(ym_list, start=1):
        url = f"{UrlPaths.CALENDAR_URL}?year={year}&month={month}"
        resp = get_resp(url, user_agent)

        if resp:
            date_list.extend(extract_dates(resp))

        # プログレスバーの更新
        update_progress(master, count, total_ym)

    update_progress(master, 0, 1)  # 進捗バーをリセット

    print_message(master, "完了")

    return sorted(date_list)


def get_resp(url, headers, max_attempts=5, wait_time=0.5):
    """
    URL から HTML レスポンスを取得する関数。

    :param url: 取得するURL
    :param headers: リクエストヘッダー(User-Agent)
    :param max_attempts: 最大試行回数(デフォルト: 5)
    :param wait_time: リクエストの間隔(デフォルト: 0.5秒)
    :return: レスポンスオブジェクト or None(失敗時)
    """
    for _ in range(max_attempts):
        try:
            resp = requests.get(url, headers=headers, timeout=10)
            if resp.status_code == 200:
                resp.encoding = "EUC-JP"
                return resp
        except requests.RequestException:
            continue  # エラー時はリトライ
    return None


def extract_dates(resp):
    """
    レスポンスから日付情報を抽出する関数。

    :param resp: requestsのレスポンスオブジェクト
    :return: 日付のリスト
    """
    soup = BeautifulSoup(resp.text, "lxml")
    return [
        match.group()
        for elem in soup.find_all(href=re.compile("date"))
        if (match := re.search(r"[0-9]{8}", elem.get("href", "")))
    ]

date_scraper 関数の概要

  • date_scraper 関数は、指定された ym_list(年と月のリスト)を処理し、開催日を取得します。
  • requests を使用してWebページのHTMLを取得。
  • BeautifulSoup によりHTMLを解析し、開催日データを抽出。
  • update_progress() を用いてGUIの進捗バーを更新。
  • 開催日リストをソートし、結果を返します。

get_resp 関数の概要

  • 指定されたURLからHTMLを取得。
  • 最大5回までリクエストを試行し、成功すればレスポンスを返す。
  • タイムアウトやネットワークエラー時はリトライを実行。

extract_dates 関数の概要

  • BeautifulSoup でHTMLを解析。
  • href 属性に「date」という文字列を含むリンクを検索。
  • 正規表現を使用し、日付フォーマット(YYYYMMDD)を抽出。
  • 抽出した日付のリストを返す。

race_id_scraper.py

ソースコード

import pickle
import re
from datetime import datetime, timedelta
from pathlib import Path

from bs4 import BeautifulSoup

from Libs.Constants import UrlPaths, data_dir_path

from .chromedriver import ChromeDriver
from .UI_helper import print_message, update_progress  # `ui_helper` をインポート


class RaceIdScraper:
    def __init__(self, master=None, overwrite=True):
        """
        レースIDをスクレイピングするクラス。

        :param master: GUIのオブジェクト(progressbar や msgbox を持つもの)
        :param overwrite: 既存のデータを上書きするかどうか(デフォルト: True)
        """
        self.master = master
        self.overwrite = overwrite
        self.limit = self.get_nearest_saturday()
        self.race_id_dir_path = Path(data_dir_path()) / "RaceId"
        self.race_id_dir_path.mkdir(parents=True, exist_ok=True)  # ディレクトリ作成
        self.driver = ChromeDriver()
        self.driver.implicitly_wait(60)
        self.attempts = 5  # 最大リトライ回数

    def get_nearest_saturday(self):
        """最も近い土曜日の日付を YYYYMMDD 形式で取得"""
        base_date = datetime.now()
        day_offset = (5 - base_date.weekday()) % 7
        nearest_saturday = base_date + timedelta(days=day_offset)
        return nearest_saturday.strftime("%Y%m%d")

    def load_existing_ids(self, filepath, date_group):
        """既存のレースIDリストを読み込む"""
        if filepath.exists():
            with open(filepath, "rb") as f:
                id_list = pickle.load(f)

            if self.overwrite:
                id_list = [
                    item for item in id_list if item[1] not in date_group
                ]  # date を使う

            ids = {i[0] for i in id_list}  # 高速検索のため set を使用
        else:
            id_list, ids = [], set()

        return id_list, ids

    def fetch_page_source(self, url):
        """URLのページソースを取得する(リトライ機能付き)"""
        for _ in range(self.attempts):
            try:
                self.driver.get(url)
                return self.driver.page_source
            except Exception:
                continue
        return None  # 取得失敗

    def extract_race_ids(self, html, ids, date):
        """HTMLからレースIDを抽出し、既存のものはスキップ"""
        soup = BeautifulSoup(html, "lxml")
        new_ids = []
        for e in soup.find_all(href=re.compile("race_id")):
            text = str(e.attrs["href"])
            race_id = re.search(r"[0-9]{12}", text).group()
            if race_id not in ids:
                ids.add(race_id)
                new_ids.append([race_id, date])  # date を保存する
        return new_ids

    def scrape_race_id(self, date_list):
        """レースIDをスクレイピングして取得"""
        date_list = self.split_dates_by_year(date_list)
        for date_group in date_list:
            year = re.match(r"[0-9]{4}", date_group[0]).group(0)
            print_message(
                self.master,
                f"{year}年のレースIDを取得しています...... ",
                line_brake=False,
            )  # `ui_helper` を使用

            filepath = self.race_id_dir_path / f"race_id_{year}.pkl"
            id_list, ids = self.load_existing_ids(filepath, date_group)

            total_dates = len(date_group)
            for count, date in enumerate(date_group, start=1):
                if date == self.limit:
                    break  # リミットに到達したら終了

                # URL作成
                url = f"{UrlPaths.RACE_LIST_URL}?&kaisai_date={date}"
                if not self.overwrite and any(date in item for item in id_list):
                    update_progress(
                        self.master, count, total_dates
                    )  # `ui_helper` を使用
                    continue

                # ページ取得
                html = self.fetch_page_source(url)
                if not html:
                    continue

                # レースID抽出
                new_ids = self.extract_race_ids(html, ids, date)
                id_list.extend(new_ids)

                update_progress(self.master, count, total_dates)  # `ui_helper` を使用

            # ファイルに保存
            id_list.sort()
            print_message(self.master, "完了")  # `ui_helper` を使用
            with open(filepath, "wb") as f:
                pickle.dump(id_list, f)

        self.driver.quit()
        update_progress(self.master, 0, 1)  # `ui_helper` を使用(進捗バーリセット)

    def split_dates_by_year(self, date_list):
        """日付リストを年ごとに分類する"""
        date_dict = {}
        for date in date_list:
            year = re.match(r"[0-9]{4}", date).group(0)
            date_dict.setdefault(year, []).append(date)
        return list(date_dict.values())

RaceIdScraper クラスの概要

  • RaceIdScraper クラスは、WebサイトからレースIDを取得し、ファイルに保存する機能を提供します。
  • ChromeDriver を使用してWebページを開き、レースIDの抽出を行います。
  • update_progress() を用いてGUIの進捗バーを更新。

get_nearest_saturday メソッドの概要

  • 現在の日付を基に、最も近い土曜日の日付を計算。
  • 計算された日付を YYYYMMDD 形式で返します。

fetch_page_source メソッドの概要

  • 指定したURLのページソースを取得。
  • 最大5回のリトライを行い、成功すればHTMLを返す。
  • 失敗した場合は None を返す。

extract_race_ids メソッドの概要

  • BeautifulSoup を用いてHTMLを解析。
  • href 属性に race_id を含むリンクを検索。
  • 正規表現を使用してレースID(12桁の数値)を抽出し、リストに保存。

scrape_race_id メソッドの概要

  • 指定された日付のリストを処理し、レースIDを取得。
  • 取得したIDをファイルに保存。
  • 進捗状況を print_message() で表示し、update_progress() で更新。
  • スクレイピング処理完了後、ChromeDriver を終了。

split_dates_by_year メソッドの概要

  • 日付リストを年ごとに分類。
  • 各年ごとにグループ化し、処理しやすい形に変換。

chromedriver.py

ソースコード

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager


def ChromeDriver():
    driverpath = ChromeDriverManager().install()
    options = webdriver.ChromeOptions()
    options.add_argument("--headless=new")
    driver = webdriver.Chrome(service=Service(driverpath), options=options)
    driver.set_window_size(974, 1039)
    return driver

ChromeDriver 関数の概要

  • ChromeDriver 関数は webdriver_manager を使用して最新の ChromeDriver をインストールし、セットアップします。
  • webdriver.ChromeOptions を利用して、ブラウザをヘッドレスモードで起動。
  • set_window_size() を用いてブラウザのウィンドウサイズを設定。

ChromeDriverManager().install() の概要

  • ChromeDriverManager を使用することで、手動で ChromeDriver をダウンロード・設定する手間を省略。
  • 常に最新バージョンの ChromeDriver を自動インストール。

webdriver.ChromeOptions の設定

  • add_argument("--headless=new") により、GUIなしでブラウザを起動。
  • set_window_size(974, 1039) で固定サイズのウィンドウを設定。

race_html_fetcher.py

ソースコード

import pickle

from fake_useragent import UserAgent

from Libs.Constants import UrlPaths, data_dir_path

from .requests_helper import get_resp
from .UI_helper import print_message, update_progress
from .utils import organize_years


def race_html_fetcher(ym_list, master=None, overwrite=True):
    """
    レース結果ページのHTMLを取得し、pickle で保存する関数。

    :param ym_list: [[yyyy, mm], [yyyy, mm], ...] のリスト
    :param master: GUIの `master` オブジェクト(`msgbox` と `progressbar` を管理)
    :param overwrite: 既存データを上書きするかどうか
    """
    user_agent = {"User-Agent": UserAgent().chrome}
    years, grouped_by_year = organize_years(ym_list)

    for y, ym_list in zip(years, grouped_by_year):
        print_message(
            master,
            f"{y}年のレース結果ページのHTMLを取得しています。...... ",
            line_brake=False,
        )

        # HTMLを保存するディレクトリを作成
        html_dir_path = data_dir_path() / "HTML" / "Result" / f"Result_{y}"
        html_dir_path.mkdir(parents=True, exist_ok=True)

        # レースIDファイルを読み込む(ファイルが必ず存在する前提)
        ids_path = data_dir_path() / "RaceID" / f"race_id_{y}.pkl"
        with open(ids_path, "rb") as f:
            ids = pickle.load(f)

        for ym in ym_list:
            # 指定年月のレースIDを抽出
            subids = [id for id in ids if id[1].startswith(ym)]
            total = len(subids)

            for count, (race_id, _) in enumerate(subids, start=1):
                # 保存パスを設定
                html_path = html_dir_path / f"{race_id}_html.pkl"

                # 既に保存済みのHTMLがある場合はスキップ
                if not overwrite and html_path.exists():
                    continue

                # URLを設定
                url = UrlPaths.RACE_URL + race_id

                # レース結果ページのHTMLを取得
                resp = get_resp(url, user_agent)
                if not resp:
                    continue

                # HTMLデータを pickle で保存
                with open(html_path, "wb") as f:
                    pickle.dump(resp, f)

                # 進捗バーを更新
                update_progress(master, count, total)

            # 月ごとの処理が終わったら進捗バーをリセット
            update_progress(master, 0, 1)

        print_message(master, "完了")

race_html_fetcher 関数の概要

  • race_html_fetcher 関数は、指定された年月のレース結果ページのHTMLを取得し、pickle 形式で保存します。
  • requests を使用してWebページのHTMLを取得。
  • update_progress() を用いてGUIの進捗バーを更新。
  • 取得したHTMLを pickle を用いてローカルに保存。

organize_years 関数の概要

  • 年ごとに年月を分類し、処理を整理。
  • race_html_fetcher がデータを適切に処理できるように整形。

get_resp 関数の概要

  • 指定したURLのHTMLを取得。
  • UserAgent を使用してブラウザの振る舞いを模倣。
  • HTTPリクエストのエラーハンドリングを実装。

データ保存処理の概要

  • 取得したレース結果ページのHTMLを pickle で保存。
  • ファイル名にレースIDを使用し、ディレクトリごとに分類。
  • overwrite フラグに基づいて既存データを保持または上書き。

race_html_parser.py

ソースコード

import pickle

from bs4 import BeautifulSoup

from Libs.Constants import data_dir_path

from .parse_info import parse_race_info
from .parse_result import parse_race_result
from .parse_return import parse_race_return
from .UI_helper import print_message, update_progress
from .utils import organize_years, remove_duplicate_errors


def race_html_parser(ym_list, master=None, overwrite=True):
    """
    レースデータのHTMLを解析し、結果・情報・払い戻しをそれぞれ保存する。

    :param ym_list: [[yyyy, mm], [yyyy, mm], ...] のリスト
    :param master: GUIの `master` オブジェクト(`msgbox` と `progressbar` を管理)
    :param overwrite: 既存データを上書きするかどうか
    """
    years, grouped_by_year = organize_years(ym_list)

    for y, ym_list in zip(years, grouped_by_year):
        print_message(
            master, f"{y}年のレースデータを抽出しています... ", line_brake=False
        )

        html_dir_path = data_dir_path() / "HTML" / "Result" / f"Result_{y}"

        # 各データの保存ディレクトリを作成
        data_paths = {
            "result": {
                "data": data_dir_path() / "Raw" / "Result" / f"Result_{y}",
                "error": data_dir_path() / "Raw" / "Result_Error" / f"Result_Error_{y}",
                "function": parse_race_result,
            },
            "info": {
                "data": data_dir_path() / "Raw" / "Information" / f"Information_{y}",
                "error": data_dir_path()
                / "Raw"
                / "Information_Error"
                / f"Information_Error_{y}",
                "function": parse_race_info,
            },
            "return": {
                "data": data_dir_path() / "Raw" / "Return" / f"Return_{y}",
                "error": data_dir_path() / "Raw" / "Return_Error" / f"Return_Error_{y}",
                "function": parse_race_return,
            },
        }

        # ディレクトリ作成
        for paths in data_paths.values():
            paths["data"].mkdir(parents=True, exist_ok=True)
            paths["error"].mkdir(parents=True, exist_ok=True)

        # レースIDファイルを読み込む
        ids_path = data_dir_path() / "RaceID" / f"race_id_{y}.pkl"
        with open(ids_path, "rb") as f:
            ids = pickle.load(f)

        for ym in ym_list:
            subids = [id for id in ids if id[1].startswith(ym)]
            total = len(subids)

            for count, (race_id, _) in enumerate(subids, start=1):
                html_path = html_dir_path / f"{race_id}_html.pkl"

                if not html_path.exists():
                    print_message(
                        master,
                        f"{race_id} のHTMLファイルが見つかりません。スキップします。",
                    )
                    continue

                with open(html_path, "rb") as f:
                    resp = pickle.load(f)

                soup = BeautifulSoup(resp.text, "lxml")

                # 各データを処理
                for key, paths in data_paths.items():
                    data_file = paths["data"] / f"{race_id}_{key}_df.pkl"
                    error_file = paths["error"] / f"{race_id}_{key}_error.pkl"

                    if not overwrite and data_file.exists():
                        continue

                    paths["function"](resp, soup, data_file, error_file)

                update_progress(master, count, total)

            update_progress(master, 0, 1)

        # エラーデータの削除
        for key, paths in data_paths.items():
            remove_duplicate_errors(key, paths["data"], paths["error"])

        print_message(master, "完了")

race_html_parser 関数の概要

  • race_html_parser 関数は、レースのHTMLデータを解析し、結果・情報・払い戻しを抽出して保存します。
  • BeautifulSoup を使用してHTMLを解析。
  • update_progress() を用いてGUIの進捗バーを更新。
  • pickle を使用して抽出したデータを保存。

organize_years 関数の概要

  • 指定された年月のリストを年ごとに分類し、処理を整理。
  • race_html_parser がデータを適切に処理できるように整形。

parse_race_result, parse_race_info, parse_race_return の概要

  • parse_race_result: レース結果データを解析し、保存。
  • parse_race_info: レース情報を解析し、保存。
  • parse_race_return: 払い戻しデータを解析し、保存。

データ保存処理の概要

  • 取得したデータを pickle で保存。
  • 各データを Result, Information, Return のフォルダに分類。
  • overwrite フラグに基づいて既存データを保持または上書き。

parse_result.py

ソースコード

import re
from io import StringIO

import pandas as pd

from .utils import save_data_or_error, set_race_id


def parse_race_result(resp, soup, data_file, error_file):
    """
    レース結果データを解析し、正常なら保存、エラーならエラーファイルに保存する。

    :param resp: 取得したHTMLレスポンスデータ
    :param data_file: 正常データの保存パス
    :param error_file: エラーデータの保存パス
    """

    try:
        # レース結果テーブルデータを抽出
        try:
            df = pd.read_html(StringIO(str(soup)))[0]
        except IndexError:
            dfs = [
                pd.read_html(StringIO(str(t)))[0]
                for t in soup.select("table:has(tr td)")
            ]
            df = dfs[0]

        # 馬IDを抽出
        horseids = []
        elems = soup.find_all("a", href=re.compile(r".+/horse/.*\d*/$"))
        for e in elems:
            id = re.search(r"horse/(.*\d*)/", e["href"]).group(1)
            horseids.append(id)
        df["horse_id"] = horseids

        # 騎手IDを抽出
        jockeyids = []
        elems = soup.find_all("a", href=re.compile(r".+/jockey/.*\d*/$"))
        for e in elems:
            id = re.search(r"jockey/(.*\d*)/", e["href"]).group(1)
            jockeyids.append(id)
        df["jockey_id"] = jockeyids

        # 調教師IDを抽出
        trainerids = []
        elems = soup.find_all("a", href=re.compile(r".+/trainer/.*\d*/$"))
        for e in elems:
            id = re.search(r"trainer/(.*\d*)/", e["href"]).group(1)
            trainerids.append(id)
        df["trainer_id"] = trainerids

        # 馬主IDを抽出
        ownerids = []
        elems = soup.find_all("a", href=re.compile(r".+/owner/.*\d*/$"))
        for e in elems:
            id = re.search(r"owner/(.*\d*)/", e["href"]).group(1)
            ownerids.append(id)
        df["owner_id"] = ownerids

        df = df.rename(columns=lambda x: x.replace(" ", ""))
        # インデックスを設定
        set_race_id(df, data_file)
    except Exception as e:
        print(e)
        df = None

    save_data_or_error(df, resp, data_file, error_file)

parse_race_result 関数の概要

  • parse_race_result 関数は、レース結果ページのHTMLを解析し、pandas.DataFrame に変換。
  • BeautifulSoup を使用してHTMLを解析。
  • horse_id, jockey_id, trainer_id, owner_id を正規表現で抽出。
  • save_data_or_error() を用いて正常データとエラーデータを管理。

レース結果テーブルの抽出

  • pd.read_html() を用いて、HTML内のテーブルをデータフレームに変換。
  • soup.select("table:has(tr td)") を使用して適切なテーブルを選択。

各種IDの抽出

  • horse_id: 競走馬のIDをリンクから抽出。
  • jockey_id: 騎手のIDをリンクから抽出。
  • trainer_id: 調教師のIDをリンクから抽出。
  • owner_id: 馬主のIDをリンクから抽出。

データの整理と保存

  • カラムの空白を削除して整形。
  • set_race_id() でレースIDを設定。
  • save_data_or_error() を用いてデータの正常性を確認し、適切な場所に保存。

parse_info.py

ソースコード

import re

import pandas as pd
from bs4 import BeautifulSoup

from Libs.Constants.master import Master

from .utils import save_data_or_error, set_race_id


def parse_race_info(resp, soup, data_file, error_file):
    """
    レース情報データを解析し、正常であればデータを保存し、エラーがあればエラーファイルに保存する。

    :param resp: HTTPレスポンスオブジェクト(HTMLテキストを含む)
    :param data_file: 正常データの保存先パス
    :param error_file: エラーデータの保存先パス
    """
    # DataFrameを指定のカラムで初期化
    df = pd.DataFrame(
        columns=[
            "race_class",  # レースクラス(例:新馬、G1、オープンなど)
            "surface",  # 馬場の種類(芝、ダート、障害)
            "date",  # レース日(yyyymmdd)
            "weekday",  # レース開催曜日(例:土、日)
            "weather",  # 天候(例:晴、雨)
            "start_time",  # 発走時刻(HHMM)
            "track_condition",  # 馬場状態(例:良、不良)
            "course_length",  # コースの距離(メートル)
            "course_direction",  # コースの回り(右、左、直線)
        ]
    )

    try:
        # レースクラスを解析しDataFrameにセット
        race_class = race_class_parser(soup)
        for k, v in Master.RACE_CLASS_DICT.items():
            if k in race_class:
                df["race_class"] = [v]
                break

        # 日付と曜日を取得
        date, weekday = date_parser(soup)
        df["date"] = [date]
        df["weekday"] = [weekday]

        # 発走時刻、距離、その他情報を取得
        (
            start_time,
            course_length,
            course_direction,
            surface,
            weather,
            track_condition,
        ) = data_parser(soup)
        df["start_time"] = [start_time]
        df["course_length"] = [course_length]

        # コースの回りを取得
        if course_direction in Master.COURSE_DIRECTION_LIST:
            df["course_direction"] = [course_direction]

        # 馬場の種類(芝・ダート・障害)を判定
        df["surface"] = [Master.SURFACE_DICT[surface]]
        # 障害レースの場合、コース回りを「障害」に設定し、race_classを「障害」に修正
        if "障" in surface:
            df["course_direction"] = ["障害"]
            df["race_class"] = ["障害"]

        # 馬場状態(例:良、不良)を判定
        df["track_condition"] = [track_condition]

        # 天候(例:晴、雨)を判定
        df["weather"] = [weather]

        # インデックスを設定し、データをファイルに保存
        set_race_id(df, data_file)

    except Exception:
        # エラー時はNoneをセットし、エラーログに保存
        df = None

    # 正常時はdata_fileに、エラー時はerror_fileにデータを保存
    save_data_or_error(df, resp, data_file, error_file)


def date_parser(soup):
    """
    HTMLから日付(yyyymmdd)と曜日を取得する関数。

    :param soup: BeautifulSoupオブジェクト
    :return: (日付(yyyymmdd), 曜日)
    """
    # 日付・曜日が含まれるspanタグを取得
    date_span = soup.find("span", class_="Race_Date")
    if date_span:
        date_text = date_span.text.strip()
        # 正規表現で日付(yyyy/mm/dd)と曜日(カッコ内)を抽出
        date_match = re.search(r"(\d{4})/(\d{1,2})/(\d{1,2})\s*\((.+)\)", date_text)
        if date_match:
            try:
                year, month, day, weekday = date_match.groups()
                # yyyymmdd形式に整形(月と日をゼロ埋め)
                date = year + month.zfill(2) + day.zfill(2)
            except Exception as e:
                print(f"日付フォーマットエラー: {e}")
                return None, None
    else:
        date, weekday = None, None

    return date, weekday


def data_parser(soup):
    """
    HTMLから発走時刻、距離、天候、馬場状態を取得する関数。

    :param soup: BeautifulSoupオブジェクト
    :return: (start_time(HHMM形式), course_length, [surface, course_direction, weather, track_condition])
    """
    # RaceDataクラスを持つdivタグを取得して連結
    elems = soup.find_all("div", class_=re.compile(r"RaceData"))
    data_html = "".join(str(e) for e in elems)
    soup2 = BeautifulSoup(data_html, "lxml")
    elems = soup2.find_all("span")

    # 初期値の設定
    start_time = surface = course_length = course_direction = weather = (
        track_condition
    ) = None

    # spanタグの内容をインデックスで判定し、必要情報を取得
    for i, e in enumerate(elems):
        text = e.text.strip()

        # 1番目のspan: 発走時間(HH:MM)
        if i == 0:
            match = re.search(r"(\d{1,2}):(\d{1,2})", text)
            if match:
                # HHMM形式に変換(ゼロ埋め)
                start_time = f"{match.group(1).zfill(2)}{match.group(2).zfill(2)}"

        # 2番目のspan: 馬場の種類・距離・回り(例:芝1600m(右))
        elif i == 1:
            match = re.search(r"(\D+)(\d+)m\((.+)\)", text)
            if match:
                surface = match.group(1).strip()  # 馬場の種類(例:芝、ダート)
                course_length = match.group(2)  # 距離(m)
                course_direction = match.group(3)  # コース回り(右、左、直線)

        # 3番目のspan: 天候(例:晴)
        elif i == 2:
            if text and text.split():
                weather = text.split()[0]
            else:
                weather = None

        # 5番目のspan: 馬場状態(例:良、不良)
        elif i == 4:
            track_condition = text

    # 馬場種類・コース回り・天候・馬場状態をリストで返却
    return (
        start_time,
        course_length,
        course_direction,
        surface,
        weather,
        track_condition,
    )


def race_class_parser(soup):
    """
    HTMLからレースクラス名を取得する関数。

    :param soup: BeautifulSoupオブジェクト
    :return: レースクラス名、またはそれを含むテキスト
    """
    race_grades = ["GIII", "GII", "GI"]
    try:
        race_class = soup.find("span", class_=re.compile("Icon_GradeType")).text
        if race_class not in race_grades:
            raise ValueError

    except Exception:
        race_class = (
            soup.find("div", class_="RaceHeader_Value_Others").find("span").text.strip()
        )

    return race_class

parse_race_info 関数の概要

  • parse_race_info 関数は、レース情報ページのHTMLを解析し、pandas.DataFrame に変換。
  • BeautifulSoup を使用してHTMLを解析。
  • race_class, date, weekday, weather, start_time, track_condition, course_length, course_direction, surface などの情報を抽出。
  • save_data_or_error() を用いて正常データとエラーデータを管理。

各種データの抽出

  • date_parser(): レース開催日 (yyyymmdd) と曜日を取得。
  • data_parser(): 発走時刻 (HHMM)、馬場状態、コース情報、天候などを取得。
  • race_class_parser(): レースクラス (G1, G2, G3, 新馬 など) を解析。

データの整理と保存

  • カラムを適切に整形し、データフレームに格納。
  • set_race_id() でレースIDを設定。
  • save_data_or_error() を用いてデータの正常性を確認し、適切な場所に保存。

parse_return.py

ソースコード

from io import StringIO

import pandas as pd
from bs4 import BeautifulSoup

from .utils import save_data_or_error, set_race_id


def parse_race_return(resp, soup, data_file, error_file):
    try:
        soup = BeautifulSoup(resp.text, "lxml")
        try:
            dfs = pd.read_html(resp.text)
        except IndexError:
            dfs = [
                pd.read_html(StringIO(str(t)))[0]
                for t in soup.select("table:has(tr td)")
            ]
        # dfsの1番目に単勝〜馬連、2番目にワイド〜三連単がある
        df = dfs[1]
        set_race_id(df, data_file)
    except Exception as e:
        print(e)
        df = None

    save_data_or_error(df, resp, data_file, error_file)

parse_race_return 関数の概要

  • parse_race_return 関数は、レース払戻情報のHTMLを解析し、pandas.DataFrame に変換。
  • BeautifulSoup を使用してHTMLを解析。
  • pandas.read_html() を利用してHTMLテーブルを抽出。
  • save_data_or_error() を用いて正常データとエラーデータを管理。

レース払戻テーブルの抽出

  • pd.read_html() を用いてHTMLテーブルを取得。
  • 1番目のテーブルに単勝〜馬連、2番目のテーブルにワイド〜三連単が格納されている。
  • set_race_id() を使用し、データにレースIDを設定。

データの整理と保存

  • pandas.DataFrame に払戻データを格納。
  • set_race_id() でレースIDを設定。
  • save_data_or_error() を用いてデータの正常性を確認し、適切な場所に保存。

requests_helper.py

ソースコード

import requests


def get_resp(url, headers, max_attempts=5):
    """
    URL から HTML レスポンスを取得する関数。

    :param url: 取得するURL
    :param headers: リクエストヘッダー(User-Agent)
    :param max_attempts: 最大試行回数(デフォルト: 5)
    :param wait_time: リクエストの間隔(デフォルト: 0.5秒)
    :return: レスポンスオブジェクト or None(失敗時)
    """
    for _ in range(max_attempts):
        try:
            resp = requests.get(url, headers=headers, timeout=10)
            if resp.status_code == 200:
                resp.encoding = "EUC-JP"
                return resp
        except requests.RequestException:
            continue  # エラー時はリトライ
    return None

get_resp 関数の概要

  • get_resp 関数は、指定されたURLからHTMLレスポンスを取得します。
  • requests.get() を使用してHTTPリクエストを実行。
  • 最大5回のリトライを行い、成功すればレスポンスを返す。
  • 失敗した場合は None を返す。

HTTPリクエストの実行

  • headers を指定し、適切な User-Agent を送信。
  • timeout=10 を設定し、リクエストのタイムアウトを制御。
  • requests.RequestException をキャッチし、エラー時にはリトライ。

レスポンスの処理

  • ステータスコード 200 の場合のみレスポンスを返す。
  • resp.encoding = "EUC-JP" により、エンコーディングを適切に設定。
  • 最大試行回数に達しても取得できない場合は None を返す。

UI_helper.py

ソースコード

"""
ui_helper.py
GUI の進捗表示やメッセージ出力を統一的に扱うユーティリティモジュール
"""


def update_progress(master, count, total):
    """
    進捗を通知(GUIがあれば progressbar を使用、なければ print())

    :param master: GUIオブジェクト(`progressbar` を持つ)
    :param count: 現在の進捗
    :param total: 合計数
    """
    progress = count / total if total > 0 else 0  # ゼロ除算回避
    if master:
        master.progressbar.set(progress)
        master.progressbar.update()
    else:
        print(f"Progress: {count}/{total} ({progress:.2%})")


def print_message(master, message, line_brake=True):
    """
    メッセージを通知(GUIがあれば msgbox.print() を使用、なければ print())

    :param master: GUIオブジェクト(`msgbox` を持つ)
    :param message: 表示するメッセージ
    """
    if master:
        master.msgbox.print(message, line_brake)
    else:
        print(message)

update_progress 関数の概要

  • progressbar を用いたGUIでの進捗表示。
  • CLI環境では print() を用いて進捗率を表示。
  • count / total を計算し、進捗状況をパーセンテージで表示。
  • total = 0 の場合のゼロ除算を回避。

print_message 関数の概要

  • GUI環境では msgbox.print() を使用。
  • CLI環境では print() を使用。
  • line_brake フラグにより改行の有無を制御可能。

utils.py

ソースコード

import pickle
from collections import defaultdict


def save_data_or_error(df, resp, data_file, error_file):
    """
    正常データは保存し、エラー発生時はエラーデータを保存する。

    :param df: 解析されたデータフレーム(Noneならエラー)
    :param resp: 取得したHTMLレスポンスデータ
    :param data_file: 正常データの保存パス
    :param error_file: エラーデータの保存パス
    """
    try:
        if df is not None:
            with open(data_file, "wb") as f:
                pickle.dump(df, f)
        else:
            with open(error_file, "wb") as f:
                pickle.dump(resp, f)
    except Exception as e:
        print(f"保存時にエラー発生: {e} - {data_file.stem if df else error_file.stem}")


def set_race_id(df, data_file):
    """
    DataFrame の先頭にレース ID カラムを追加する。

    :param df: DataFrame オブジェクト
    :param data_file: 保存するファイルのパス(レース ID を取得するため)
    """
    # ファイル名からレースIDを取得(例: 202307010510_result → 202307010510)
    race_id = data_file.stem.split("_")[0]  # レースIDを抽出

    # DataFrameの先頭に 'race_id' カラムを追加(全行に同じレースIDを設定)
    df.insert(0, "race_id", race_id)


def remove_duplicate_errors(key, data_dir, error_dir):
    """
    正常データとエラーデータを比較し、同じレースIDのエラーファイルを削除する。

    :param data_dir: 正常データの保存ディレクトリ
    :param error_dir: エラーデータの保存ディレクトリ
    """
    data_files = {f.stem.split("_")[0] for f in data_dir.iterdir()}
    error_files = {f.stem.split("_")[0] for f in error_dir.iterdir()}

    for race_id in data_files.intersection(error_files):
        error_file = error_dir / f"{race_id}_{key}_error.pkl"
        if error_file.exists():
            error_file.unlink()
            print(f"{race_id} のエラーファイルを削除しました。")


def organize_years(ym_list):
    """
    年月リスト ([yyyy, mm] のリスト) から、年ごとに整理された [yyyymm] のリストを作成する。

    :param ym_list: [[yyyy, mm], [yyyy, mm], ...] のリスト
    :return: (years, grouped_by_year)
        - years: [yyyy, yyyy, ...] の 1次元リスト(ユニークな年)
        - grouped_by_year: [[yyyymm, yyyymm], [yyyymm, ...]] の 2次元リスト(年ごとに整理)
    """
    year_dict = defaultdict(list)
    for year, month in ym_list:
        yyyymm = f"{year}{str(month).zfill(2)}"
        year_dict[year].append(yyyymm)

    years = sorted(year_dict.keys())
    grouped_by_year = [year_dict[year] for year in years]

    return years, grouped_by_year

save_data_or_error 関数の概要

  • 正常データを pickle で保存。
  • 解析エラーが発生した場合、レスポンスデータをエラーファイルとして保存。
  • 例外処理を実装し、保存時のエラーをキャッチして適切にログ出力。

set_race_id 関数の概要

  • DataFrame の先頭に race_id カラムを追加。
  • 保存ファイルのファイル名からレースIDを取得し、全行に適用。
  • データの一貫性を保つため、レースIDの統一管理を実施。

remove_duplicate_errors 関数の概要

  • 正常データとエラーデータを比較し、重複したエラーファイルを削除。
  • data_dir 内のデータを取得し、同じレースIDが存在するエラーファイルを特定。
  • unlink() を使用して不要なエラーファイルを削除し、ストレージを最適化。

organize_years 関数の概要

  • 年月のリストを整理し、年ごとに分類。
  • defaultdict(list) を使用し、年単位でデータをグループ化。
  • 各年の yyyymm のリストを生成し、データ処理を簡単に。
よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

コメント

コメントする

目次