両学長基準の高配当株スクリーニング自動化|Python並列処理+LINE通知の使い方

Chelsea-Labs #15 サムネイル

免責事項

本記事は投資助言を目的としたものではなく、技術・分析手法の紹介です。記事中のコード・スクリーニング結果は教育目的であり、特定の銘柄・金融商品の売買を推奨するものではありません。投資判断はご自身の責任で行ってください。本記事中に J-Quants API から取得した実データは掲載していません(利用規約に基づく方針、詳細は 記事#13)。LINE Messaging API・GitHub Actions・各クラウドサービスの利用規約・料金プランは変更される可能性があるため、実装時は公式ドキュメントを必ず確認してください(本記事の数値は2026年5月時点の参考値)。

前回の記事#14では、J-Quants と EDINET から取得した財務データを DuckDB で統合し、業種別補正に対応するデータマート(v_screening_input ビュー)を完成させました。今回はその上に乗せる形で、「東証上場約3,900社の高配当株スクリーニングを Python で自動化する」本格的なパイプラインを構築します。本記事は応用編で扱う「自動化の使い方」の集大成です。

応用編 #11〜#14 で「設計(基準・データソース・統合)」を整えてきました。本記事は応用編のPhase 3 分析の本番で、これまでのパイプラインが「動いている自動化システム」として完成する回です。両学長スクリーニング基準を Python で再現する目標が、ここで一つの形になります。

多くの読者が壁にぶつかる箇所は、次の3つです:

  • 業種補正の自動切替: direction カラム(higher_better / lower_better / range)に応じて判定ロジックを切り替える。手で書くと if 分岐だらけになる
  • 処理時間の壁: 全銘柄を直列で判定すると数十分かかる場合がある。Python の multiprocessing で並列化することで現実的な時間に収める
  • 結果の通知設計: PASS 銘柄が10件出ても見ない人が多い。LINE 通知で「今日のPASS銘柄N件」を毎朝届けることで、自動化が日常運用に組み込める

筆者は製造業の開発現場で、製品検査の全数自動化と並列化を多く経験してきました。「1個ずつ検査すると1日100台しか出荷できないが、10並列で動かせば1,000台/日になる」という並列化の発想は、投資データのスクリーニングでもそのまま機能します。本業の検査自動化と、本記事の銘柄スクリーニング自動化は、「N個の独立した判定を最短時間で並列実行する」という構造で完全に重なります。

本記事では、応用編 #11/#12 で導入した USL/LSL(Upper/Lower Spec Limit、規格上限・下限)の枠組みを継承する形で、spec_sheet_judge_v2.py(業種補正対応の判定器)を実装します。さらに、multiprocessing.Pool で並列処理LINE Messaging API で通知連携、最後に cron / GitHub Actions で日次バッチ自動化のスケルトンまで含めて、5個のスニペットでパイプライン全体を組み上げます。

本記事の前提と難易度

  • Python の標準ライブラリ(multiprocessingargparselogging)の経験があると進めやすい
  • 記事 #11〜#14 のパイプライン(取得 → 統合 → データマート)が完了している前提
  • LINE Messaging API のチャネル発行(5分で完了)が必要。LINE Notify は2025年3月に提供終了したため、本記事は Messaging API への移行版手順
  • GitHub Actions セクションは GitHub アカウントのみ必要(プライベートリポジトリ無料枠で十分)
  • 本記事のパイプラインが完成すると、応用編シリーズの「両学長スクリーニング自動化」のミニマム版が稼働する状態になります

「実装したくない読者」向け代替案

本記事は Python 実装による自動化を扱いますが、「自動化は要らない、まず使ってみたい」という読者には、SBI証券・楽天証券・マネックス証券の無料スクリーニングツールでの実行をおすすめします。両学長基準(配当利回り4%以上・配当性向30〜60%・自己資本比率40%以上など)は、これらの証券会社の Web スクリーニングで条件指定して即実行できます。本記事の自動化は「毎朝6時に最新結果が LINE に届く運用」が目的で、その運用ニーズが無い場合は無料ツールが最短ルートです。本シリーズで設計した6基準を理解した上であれば、いずれの方法でも結果に大差はありません。

結論:高配当株スクリーニング自動化は、「業種補正対応の判定器 × multiprocessing 並列化 × LINE 通知 × cron/Actions スケジューリング」の4要素で完成する。本記事のサンプルコード5本を動かせば、毎朝6時に「今日のPASS銘柄一覧」が LINE に届く運用が始まる。製造業の全数検査自動化と同じ思想で、判定の独立性・並列性・通知優先度の3点を最初に設計するのが鍵。継続コストは年間約2万円で個人投資家の自動化として現実的な範囲。

目次

高配当株スクリーニングパイプラインの全体像(5要素構成)

┌─────────────────────────── データマート(記事#14 で完成)──────────────────────────┐
│  v_screening_input ビュー                                                          │
│  (ticker, industry_profile, yield_pct, payout, equity_ratio, ... + direction)  │
└──────────────────────────────────┬─────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────── 判定層(本記事の主題)──────────────────────────────┐
│  ① spec_sheet_judge_v2.py                                                         │
│     direction(higher_better / lower_better / range)対応の業種補正版判定器      │
│                                                                                   │
│  ② スクリーニング Runner                                                          │
│     全銘柄を取得 → 並列判定 → 結果集約 → 出力                                    │
│                                                                                   │
│  ③ multiprocessing.Pool 並列処理(initializer パターンで spawn 対応)              │
│     N個の銘柄を CPU コア数だけ並列で判定(処理時間 1/N に短縮)                   │
│                                                                                   │
│  ④ 出力(CSV / Parquet)                                                          │
│     daily/2026-05-07.csv: PASS/CAUTION/FAIL の判定結果一覧                        │
└──────────────────────────────────┬─────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────── 通知・運用層 ────────────────────────────────────────┐
│  ⑤ LINE Messaging API: 「本日 PASS N 件」を朝6時に通知                            │
│     cron / GitHub Actions: 毎日定刻に Runner を起動 + 失敗通知                    │
└────────────────────────────────────────────────────────────────────────────────┘

エンジニア的に言い換えると(製造業の全数検査自動化との対応)

このパイプラインの設計は、製造業 DX で言う 「全数検査ライン × セル生産方式の並列化 × 異常通知 × 定時運用」のミニマム構成そのものです:

  • 判定層 = 検査機(製品スペックシートに従って合否を出す、USL/LSL の自動切替対応)
  • 処理層 = セル生産方式の並列ライン(複数の検査セルが独立稼働 = embarrassingly parallel、並列化が容易)
  • 通知層 = 異常・抽出通知システム(PASS銘柄通知、本業では「異常品の通知」、構造は同じ)
  • 運用層 = 生産シフト管理(毎朝6時に検査ラインを起動)

応用編 #11/#12 のスペックシートが「個別判定の設計」だったのに対し、本記事は「判定を生産ラインとして組み立てる」段階。応用編全体で扱ってきた「製品スペックシート × FMEA × MDM × ELT × キッティング工程」のメタファーが、本記事で1つの自動化システムに結実します。

PASS が出てから「実際に買う」までのギャップを必ず意識する

機械フィルタの限界 — PASS は「買い候補」ではない

本記事のスクリーニングは 「両学長基準に機械的に合致する銘柄を抽出する」ところまでが範囲です。PASS が出た銘柄をそのまま「買い候補」と読むのは誤読で、両学長スタイルでも以下4点の定性調査が必須になります:

  • 事業内容の理解: その会社が何で稼いでいるか・主要顧客・競合状況・業界の構造変化に対する耐性
  • 競争優位性: 参入障壁・ブランド力・ネットワーク効果など、持続的に高配当を維持できる根拠
  • 株価水準の妥当性: PER・PBR・過去5年レンジ内での現在位置(応用編 #18 で扱う)
  • ポートフォリオ全体との整合: 業種分散・既存保有銘柄との相関(応用編 #19 で扱う)

本記事の自動化は「定量フィルタ」を全自動化することで、定性調査に時間を使えるようにするためにあります。PASS が10件出れば、その10件に対してじっくり定性調査をする、という運用前提です。

PASS 件数別の意味解釈

PASS 件数意味取るべきアクション
0〜数件市場全体が割高 or 配当条件が厳しすぎ閾値を緩めるか、相場局面を再評価。配当利回りを 3.5% に下げる等
10〜30件適切な抽出範囲(推奨)定性調査を時間配分して進める
50件超条件が緩すぎ or 業種補正に問題業種別に件数を分解 → 偏りを確認、業種補正テーブル見直し
100件超機械的フィルタの限界、判定基準を見直すEPS/連続増配の閾値強化、追加指標の導入

通知頻度の選択肢(日次/週次/月次)

本記事の例では「毎朝6時の日次通知」を採用していますが、これは長期保有・FIRE志向の読者には頻度が高すぎる場合があります。日次更新が必要なのは「短期トレード視点で日々の市場変動を追う」ユースケースで、長期保有目的なら週次(月曜朝)または月次(月初)通知のほうが自然です。cron の頻度を 0 6 * * 1(週次)や 0 6 1 * *(月次)に変更するだけで切替可能です。

スニペット1:業種補正対応の判定器サンプルコード spec_sheet_judge_v2.py

記事 #12 で実装した spec_sheet_judge.py は静的な閾値で判定していました。本記事では industry_indicator_map テーブルから動的に閾値・direction を読み込み、業種ごとに USL/LSL を切り替えて判定する v2 を実装します。v2 では新たに direction="range" をサポートし、利回り(4-8%が PASS)のように両側仕様の指標もテーブル駆動で扱えるようにします。

# spec_sheet_judge_v2.py — 業種補正対応の判定器
# 動作環境: Python 3.11+ / 標準ライブラリのみ
from typing import Literal, TypedDict, Callable

Verdict = Literal["PASS", "CAUTION", "FAIL"]

class IndustryRule(TypedDict):
    direction: Literal["higher_better", "lower_better", "range"]
    threshold_pass: float
    threshold_caution: float
    # range 指標用(オプション)
    range_high_pass: float | None     # range の上限 PASS
    range_high_caution: float | None  # range の上限 CAUTION(超過 FAIL)

def make_judge(rule: IndustryRule) -> Callable[[float | None], Verdict]:
    """業種別ルール(threshold + direction)から判定関数を組み立てる

    direction の3パターン:
    - higher_better: 値 >= threshold_pass で PASS(LSL 設計)
    - lower_better : 値 <= threshold_pass で PASS(USL 設計)
    - range        : threshold_pass <= 値 <= range_high_pass で PASS(両側)
    """
    p = rule["threshold_pass"]
    c = rule["threshold_caution"]
    if rule["direction"] == "higher_better":
        def judger(v: float | None) -> Verdict:
            if v is None: return "CAUTION"
            if v >= p: return "PASS"
            if v >= c: return "CAUTION"
            return "FAIL"
    elif rule["direction"] == "lower_better":
        def judger(v: float | None) -> Verdict:
            if v is None: return "CAUTION"
            if v <= p: return "PASS"
            if v <= c: return "CAUTION"
            return "FAIL"
    else:  # range(4-8%の利回り等の両側仕様)
        hp = rule.get("range_high_pass") or float("inf")
        hc = rule.get("range_high_caution") or float("inf")
        def judger(v: float | None) -> Verdict:
            if v is None: return "CAUTION"
            if p <= v <= hp: return "PASS"
            if v < p or hp < v <= hc: return "CAUTION"
            return "FAIL"
    return judger

def judge_v2(metrics: dict, industry_rules: dict[str, IndustryRule]) -> dict:
    """6基準を業種補正適用のうえ判定する(テーブル駆動、利回りも含む)

    metrics は v_screening_input の1行(dict)
    industry_rules は industry_indicator_map から銘柄の industry_profile に対応するルール集
    """
    per_metric: dict[str, Verdict] = {}
    # 利回り・配当性向・自己資本比率の3指標は industry_indicator_map で完全テーブル駆動
    for indicator in ("yield_pct", "payout", "equity_ratio"):
        rule = industry_rules.get(indicator)
        if rule is None:
            per_metric[indicator] = "CAUTION"  # ルール未定義は CAUTION
            continue
        per_metric[indicator] = make_judge(rule)(metrics.get(indicator))

    # 時系列系(eps_trend, consec_inc_years, ocf_positive_years)は応用編 #16 で精緻化
    # それまでは「値があれば PASS」ではなく「常に CAUTION」で誤PASSを防ぐ
    for k in ("eps_trend", "consec_inc_years", "ocf_positive_years"):
        per_metric[k] = "CAUTION"  # 応用編 #16 で時系列分析実装後に PASS/FAIL に分岐

    overall: Verdict = "FAIL" if "FAIL" in per_metric.values() else \
                       "CAUTION" if "CAUTION" in per_metric.values() else "PASS"
    return {"per_metric": per_metric, "overall": overall}

# 使用例(高配当株向け、industry_indicator_map から取得した想定)
# rules_manufacturing = {
#     "yield_pct":    {"direction": "range", "threshold_pass": 4.0, "threshold_caution": 3.0,
#                      "range_high_pass": 8.0, "range_high_caution": 12.0},
#     "payout":       {"direction": "lower_better", "threshold_pass": 60, "threshold_caution": 80,
#                      "range_high_pass": None, "range_high_caution": None},
#     "equity_ratio": {"direction": "higher_better","threshold_pass": 40, "threshold_caution": 30,
#                      "range_high_pass": None, "range_high_caution": None},
# }
# m = {"yield_pct": 4.5, "payout": 45, "equity_ratio": 50}
# print(judge_v2(m, rules_manufacturing))

USL/LSL/range の3パターン自動切替(応用編 #11/#12 用語の継承)

make_judge() 関数の中で direction を見て分岐するのが、応用編 #11/#12 で導入した USL/LSL 設計の Python 実装です:

  • higher_better → LSL(値が threshold_pass 以上で PASS): 自己資本比率・BIS比率・格付けスコア
  • lower_better → USL(値が threshold_pass 以下で PASS): LTV・配当性向
  • range → USL+LSL の組み合わせ(v2 追加): 配当利回り(4〜8% で PASS、それ以外は CAUTION/FAIL)

この1関数で業種補正の方向性が吸収できるため、新しい業種を追加する際も industry_indicator_map に行を追加するだけで対応可能。製造業の検査機が「公差上限・下限」両方に対応しているのと、構造的に完全に同じです。

スニペット2:全銘柄スクリーニング Runner(DuckDB → 判定 → 出力)

判定器ができたら、全銘柄に適用する Runner を組みます。v_screening_input ビューから全銘柄のメトリクスを SELECT し、industry_indicator_map から業種別ルールを読み、各銘柄を judge_v2 で判定して結果を CSV と Parquet で出力します。

# screen_all.py — 全銘柄スクリーニング Runner(直列版、次のスニペットで並列化)
# 動作環境: Python 3.11+ / duckdb 0.10+ / pandas 2.x / pyarrow 14+
import duckdb
import pandas as pd
from datetime import date
from pathlib import Path
from spec_sheet_judge_v2 import judge_v2, IndustryRule

DB_PATH = Path("data/stocks.duckdb")
OUT_DIR = Path("data/screening_results")

def load_industry_rules(conn: duckdb.DuckDBPyConnection) -> dict[str, dict[str, IndustryRule]]:
    """{industry_profile: {indicator: IndustryRule}} の二段 dict で返す"""
    rows = conn.execute("""
        SELECT industry_profile, standard_indicator, direction, threshold_pass, threshold_caution
        FROM industry_indicator_map
    """).fetchall()
    rules: dict[str, dict[str, IndustryRule]] = {}
    for prof, ind, dirn, p, c in rows:
        rules.setdefault(prof, {})[ind] = {
            "direction": dirn, "threshold_pass": p, "threshold_caution": c,
            "range_high_pass": None, "range_high_caution": None,
        }
    return rules

def run_screening_serial() -> tuple[Path, Path]:
    """全銘柄を直列で判定(並列化前のベースライン実装)"""
    OUT_DIR.mkdir(parents=True, exist_ok=True)
    today = date.today().isoformat()
    out_csv = OUT_DIR / f"{today}.csv"
    out_parquet = OUT_DIR / f"{today}.parquet"

    with duckdb.connect(str(DB_PATH)) as conn:
        rules_by_profile = load_industry_rules(conn)
        df = conn.execute("SELECT * FROM v_screening_input").fetchdf()

    # iterrows() は遅いので to_dict(orient="records") で dict 化(2-3倍高速)
    metrics_list = df.to_dict(orient="records")
    results = []
    for metrics in metrics_list:
        profile = metrics.get("industry_profile") or "manufacturing"
        rules = rules_by_profile.get(profile, rules_by_profile.get("manufacturing", {}))
        verdict = judge_v2(metrics, rules)
        results.append({
            "ticker_normalized": metrics["ticker_normalized"],
            "name_jp": metrics.get("name_jp"),
            "industry_profile": profile,
            "overall": verdict["overall"],
            "per_metric": str(verdict["per_metric"]),
        })

    result_df = pd.DataFrame(results)
    result_df.to_csv(out_csv, index=False)
    result_df.to_parquet(out_parquet, index=False)
    print(f"saved: {out_csv} / {out_parquet} ({len(result_df)} rows)")
    return out_csv, out_parquet

if __name__ == "__main__":
    run_screening_serial()

この直列版は、銘柄数が数百なら数秒で動きます。しかし、東証上場約3,900社を直列で処理すると数十秒〜分単位の時間がかかる場合があり、スクリーニング基準を試行錯誤するときに不便です。次のスニペットで multiprocessing による並列化を実装します。

スニペット3:multiprocessing.Pool で並列高速化する(spawn 対応版)

銘柄ごとの判定は完全に独立しており、相互依存がありません。これは並列化の理想形(embarrassingly parallel、並列化が容易な問題)で、製造業のセル生産方式(独立したセルが並列に動く)と同じ構造です。multiprocessing.Pool を使えば CPU コア数倍の高速化が見込めます。

形式の罠:multiprocessing の spawn 対応(macOS / Windows 必須)

  • macOS(Python 3.8+)と Windows のデフォルト start method は spawn(fork ではない)。spawn 環境では親プロセスのグローバル変数は子プロセスに引き継がれない
  • v1 でグローバル変数 _RULES_BY_PROFILE に依存していたのは大きな実装ミスで、macOS/Win では rules が空のまま判定されていた。v2 では Pool(initializer=, initargs=) パターンに修正
  • Linux のデフォルトは fork のため v1 でも動いていたが、本番環境(GitHub Actions の ubuntu-latest)と開発環境(macOS)で挙動が分かれる、典型的な「動かないが原因が見えづらい」バグ
# screen_parallel.py — multiprocessing.Pool で全銘柄並列判定(v2: spawn 対応版)
# 動作環境: Python 3.11+ / duckdb 0.10+ / pandas 2.x / pyarrow 14+
import duckdb
import pandas as pd
from multiprocessing import Pool, cpu_count
from datetime import date
from pathlib import Path
from spec_sheet_judge_v2 import judge_v2

DB_PATH = Path("data/stocks.duckdb")
OUT_DIR = Path("data/screening_results")

# モジュールレベル変数(worker ごとに initializer で個別にセットされる)
_RULES_BY_PROFILE: dict = {}

def _init_worker(rules_by_profile: dict) -> None:
    """各 worker プロセスの起動時に1回だけ呼ばれる(spawn でも fork でも動作する)"""
    global _RULES_BY_PROFILE
    _RULES_BY_PROFILE = rules_by_profile

def judge_one(metrics: dict) -> dict:
    """1銘柄分の判定(worker から呼ばれる単位)"""
    rules_by_profile = _RULES_BY_PROFILE
    profile = metrics.get("industry_profile") or "manufacturing"
    rules = rules_by_profile.get(profile, rules_by_profile.get("manufacturing", {}))
    verdict = judge_v2(metrics, rules)
    return {
        "ticker_normalized": metrics["ticker_normalized"],
        "name_jp": metrics.get("name_jp"),
        "industry_profile": profile,
        "overall": verdict["overall"],
        "per_metric": str(verdict["per_metric"]),
    }

def load_rules_in_main() -> dict:
    """メインプロセスで rules を読み込み、initargs として子プロセスに渡す"""
    rules: dict = {}
    with duckdb.connect(str(DB_PATH)) as conn:
        rows = conn.execute("""
            SELECT industry_profile, standard_indicator, direction, threshold_pass, threshold_caution
            FROM industry_indicator_map
        """).fetchall()
        for prof, ind, dirn, p, c in rows:
            rules.setdefault(prof, {})[ind] = {
                "direction": dirn, "threshold_pass": p, "threshold_caution": c,
                "range_high_pass": None, "range_high_caution": None,
            }
    return rules

def run_screening_parallel(n_workers: int | None = None) -> Path:
    """全銘柄を並列で判定(worker 数はデフォルトで CPU コア数)

    Pool(initializer=...) で各 worker に rules を初回1回だけセットすることで、
    spawn 環境でもグローバル変数が確実に引き継がれる。
    """
    OUT_DIR.mkdir(parents=True, exist_ok=True)
    today = date.today().isoformat()
    out_parquet = OUT_DIR / f"{today}.parquet"

    rules = load_rules_in_main()
    with duckdb.connect(str(DB_PATH)) as conn:
        df = conn.execute("SELECT * FROM v_screening_input").fetchdf()
    metrics_list = df.to_dict(orient="records")  # iterrows より高速

    n = n_workers or cpu_count()
    with Pool(processes=n, initializer=_init_worker, initargs=(rules,)) as pool:
        results = pool.map(judge_one, metrics_list, chunksize=50)

    result_df = pd.DataFrame(results)
    result_df.to_parquet(out_parquet, index=False)
    print(f"workers={n}, processed={len(result_df)}, saved={out_parquet}")
    return out_parquet

if __name__ == "__main__":
    run_screening_parallel()

タイミングの罠:並列化のパフォーマンス特性

  • I/O bound vs CPU bound: 本判定処理は CPU bound(純粋な計算)。multiprocessing.Pool が有効。asynciothreading は GIL(Global Interpreter Lock、Python のグローバルインタプリタロック)の制約で CPU bound には効かない
  • chunksize の調整: 銘柄ごとの判定が軽量(マイクロ秒オーダー)なため、chunksize=50 程度で worker 間のオーバーヘッドを吸収。1だとプロセス間通信のコストが処理時間を上回る
  • initializer による rules 共有: Pool(initializer=_init_worker, initargs=(rules,)) で worker プロセス起動時に1回だけ rules を渡す。各 worker プロセスは _RULES_BY_PROFILE をローカル定数として保持(プロセス間共有ではなくプロセス内キャッシュ)
  • 3,900銘柄の現実的な所要時間: 直列で30秒の処理が、4コアで約8秒、8コアで約4秒に短縮(実機構成・I/O コスト次第)

エンジニア的に言い換えると(セル生産方式の3項対応)

製造業のセル生産方式では、ライン式と異なり「独立したセル(小規模ユニット)」が並列に動きます。Python の multiprocessing.Pool はまさに同じ発想:

  • 1セル = 1 worker プロセス: 銘柄判定を独立して処理
  • セル間の独立性 = 銘柄間にデータ依存がない: 応用編 #14 の3層分離設計が並列化を可能にした
  • セル数 = CPU コア数: 物理的な並列度の上限が見える

本記事の判定処理が「embarrassingly parallel(並列化が容易な問題)」と言えるのは、銘柄間にデータ依存がない設計を #14 の段階で確立したからです。設計が並列化を可能にし、並列化が運用速度を可能にする、という連鎖です。

スニペット4:LINE Messaging API で結果を通知する(始め方の手順付き)

LINE Notify 廃止後の移行先 — Messaging API への切替

LINE Notify は2025年3月末で提供終了しました。それ以前に Notify を使っていた方は LINE Messaging API への移行が必要です。本記事の方法は Messaging API 対応版で、Notify から移行する読者にもそのまま使えます。Messaging API は月200通までは無料(2024年4月のコミュニケーションプラン改定後)、それを超える場合は段階的な課金プランがあります。日次1通でも年間365通になり2ヶ月目から課金プラン(200通超分)が必要なため、コスト計算には注意が必要です。

LINE Messaging API のセットアップ手順(始め方)

  1. LINE Developers コンソールにログイン → 「プロバイダー」を作成(任意の名前)
  2. プロバイダー内に「Messaging API チャネル」を作成(チャネル名・説明・カテゴリを入力)
  3. 作成したチャネルの「Messaging API 設定」タブで「チャネルアクセストークン(長期)」を発行 → LINE_TOKEN として保存
  4. 同タブの QR コードを LINE で読み取って友だち追加(自分のアカウントで追加)
  5. 「ターゲット User ID」の取得には2通りの方法:
    • 方法A: 「Basic settings」タブの「Your user ID」を確認(自分の Bot に対する自分の User ID として使う場合)
    • 方法B: Webhook を有効化して、友だち追加時の follow イベントから userId を抽出(複数ユーザーへの配信時に必要)
  6. 取得した User ID を LINE_USER_ID として保存
# notify_line.py — LINE Messaging API でスクリーニング結果を通知
# 動作環境: Python 3.11+ / requests 2.31+
import os
import requests
import pandas as pd
from datetime import date
from pathlib import Path

LINE_PUSH_URL = "https://api.line.me/v2/bot/message/push"
RESULTS_DIR = Path("data/screening_results")

def build_message(result_df: pd.DataFrame) -> str:
    """PASS 銘柄をリスト化したメッセージを組み立てる(一般語化済)"""
    pass_df = result_df[result_df["overall"] == "PASS"]
    today = date.today().isoformat()
    lines = [f"📊 {today} 高配当株スクリーニング結果", f"PASS: {len(pass_df)} 件"]
    if len(pass_df) > 0:
        # 上位10件のみ表示(過剰通知防止、過剰通知化したら閾値見直しのトリガー)
        for _, row in pass_df.head(10).iterrows():
            lines.append(f"  ・{row['ticker_normalized']} {row.get('name_jp','')}")
        if len(pass_df) > 10:
            lines.append(f"  ...他 {len(pass_df) - 10} 件(CSV/Parquet を確認)")
    lines.append(f"CAUTION: {(result_df['overall']=='CAUTION').sum()} 件")
    lines.append(f"FAIL: {(result_df['overall']=='FAIL').sum()} 件")
    lines.append("※ PASS は機械的フィルタ通過のみを意味し、買い推奨ではありません。事業内容・株価水準・分散の定性調査が必要です。")
    return "\n".join(lines)

def push_to_line(text: str, to_user_id: str, access_token: str) -> None:
    """LINE Messaging API push で1ユーザーに送信"""
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
    }
    payload = {"to": to_user_id, "messages": [{"type": "text", "text": text}]}
    resp = requests.post(LINE_PUSH_URL, json=payload, headers=headers, timeout=30)
    resp.raise_for_status()

def notify_today() -> None:
    today = date.today().isoformat()
    parquet_path = RESULTS_DIR / f"{today}.parquet"
    if not parquet_path.exists():
        print(f"結果ファイル未生成: {parquet_path}")
        return
    df = pd.read_parquet(parquet_path)
    msg = build_message(df)
    push_to_line(
        msg,
        to_user_id=os.environ["LINE_USER_ID"],
        access_token=os.environ["LINE_TOKEN"],
    )
    print(f"sent ({len(msg)} chars)")

if __name__ == "__main__":
    notify_today()

形式の罠:通知設計の3つの落とし穴

  • 過剰通知(オオカミ少年化): PASS 銘柄が100件出ても全部通知すると無視される。上位10件+件数のみ表示で「気になったら CSV を見る」運用に
  • 認証情報の取り扱い: LINE_TOKENLINE_USER_ID も機密情報。環境変数経由のみ、ログ出力時もマスキング必須
  • レート制限と料金: LINE Messaging API は2024年4月改定後、無料枠が月200通。日次運用なら年間365通で2ヶ月目から課金プラン(コミュニケーションプラン: 月額5,500円〜)が発生。週次(年52通)または月次(年12通)に切り替えれば無料枠内で運用可能。複数ユーザーへの配信は Multicast API(最大500人/通知)の検討

スニペット5:cron / GitHub Actions で日次バッチ自動化する

最後に、Runner と通知をスケジュール起動するスケルトンを示します。Phase A(ローカル運用)では cron、Phase B 以降(GitHub Actions or Cloud Run)に移行することで完全クラウド化できます。失敗時の通知も同じ仕組みで設定することで、運用停止に気づける状態を保ちます。

# crontab -e で編集 — 毎朝6時にスクリーニング → 通知 + 失敗時メール通知
# 失敗時は MAILTO 経由で本人にメール(Postfix 等の MTA が必要)
MAILTO=your-email@example.com
0 6 * * * cd /path/to/chelsea-labs-project && \
  /usr/bin/env python scripts/screen_parallel.py >> logs/screen.log 2>&1 && \
  /usr/bin/env python scripts/notify_line.py >> logs/notify.log 2>&1 \
  || /usr/bin/env mailx -s "[chelsea-labs] screening failed" "$MAILTO" < logs/screen.log

# 週次運用版(月曜朝6時、無料枠内に収める場合)
# 0 6 * * 1 cd /path/to/chelsea-labs-project && python scripts/screen_parallel.py && python scripts/notify_line.py

# .github/workflows/screening.yml — GitHub Actions スケジュール実行(Phase B 移行版)
name: dividend-screening
on:
  schedule:
    - cron: "0 21 * * *"  # 日本時間 6:00(UTC 21:00 前日)
  workflow_dispatch:       # 手動実行も可能
jobs:
  run-screening:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"
      - run: pip install -r requirements.txt
      - name: Run screening
        env:
          JQUANTS_EMAIL:    ${{ secrets.JQUANTS_EMAIL }}
          JQUANTS_PASSWORD: ${{ secrets.JQUANTS_PASSWORD }}
          EDINET_API_KEY:   ${{ secrets.EDINET_API_KEY }}
          LINE_TOKEN:       ${{ secrets.LINE_TOKEN }}
          LINE_USER_ID:     ${{ secrets.LINE_USER_ID }}
        run: |
          python scripts/screen_parallel.py
          python scripts/notify_line.py
      - name: Notify on failure  # 失敗時の通知(GitHub の標準メール通知が走る + Slack/LINE 等の追加可能)
        if: ${{ failure() }}
        run: echo "::error::Screening pipeline failed - check logs"

cron 版は手元のマシンが起動している時間しか動きません。GitHub Actions 版に移行すると、PCを切ってもクラウド側で毎日実行されます。プライベートリポジトリの無料枠(月2,000分)で十分動く規模で、本格運用前にローカルで動作確認してから GitHub Actions に乗せるのが定石です。失敗通知は GitHub Actions の標準メール(Settings → Notifications で有効化)でも十分機能します。

継続運用コスト試算(年間約2万円〜)

項目料金備考
J-Quants Light1,650円/月(年 19,800円)5営業日遅延・全銘柄。応用編シリーズの全銘柄スクリーニング前提
EDINET API0円完全無料(API キー登録のみ)
LINE Messaging API0円〜5,500円/月無料枠 月200通、日次なら2ヶ月目から課金。週次運用なら無料
GitHub Actions0円プライベートリポジトリ無料枠(月2,000分)で十分
サーバ・PC0円〜cron 運用なら手元PC、GitHub Actionsならクラウド側
合計(週次運用)年 19,800円J-Quants Light のみ
合計(日次運用)年 約 60,000円J-Quants Light + LINE 課金プラン

個人投資家のスクリーニング自動化として、週次運用なら年間2万円に収まります。日次運用にする場合は LINE 課金が乗るため約6万円。コスパで考えると長期保有・FIRE志向なら週次推奨、デイトレ的な日次更新が必要な場合のみ日次運用、というのが現実的です。

プロセスFMEAでスクリーニングパイプラインの故障モード × 検出指標

応用編 #13/#14 で取得層・統合層の FMEA を整理しました。本記事の判定・通知・運用層にも、独自の故障モードがあります。

故障モード影響度検出指標事前対策
multiprocessing spawn 環境でのrules欠落致命的macOSで全銘柄が CAUTION/FAIL 化Pool(initializer=, initargs=) パターン採用、ローカルで multiprocessing.set_start_method('spawn') でテスト
業種別ルールの方向性誤適用致命的LTV 60% を「PASS」と誤判定direction カラム必須参照、make_judge でユニットテスト
過剰通知(オオカミ少年化)中〜高PASS 件数が想定以上(100件超)で読まれなくなる上位10件のみ表示+件数サマリ。閾値見直しのトリガーとして件数を監視
欠損通知(運用停止)Runner エラー時に通知が送られないcron MAILTO + GitHub Actions on: failure で運用停止検知
認証情報の漏洩致命的LINE_TOKEN / JQUANTS_PASSWORD がログ・コミットに混入環境変数 + .gitignore + GitHub Secrets で保護、ログマスキング
業種補正テーブルの陳腐化が判定に伝播BIS 規制改定後も古い閾値で判定継続四半期レビュー(記事 #14 と共通の運用作法)
夜間実行時の API レート制限J-Quants の取り直しが集中して 429取得は別バッチ(時間ずらし)、本記事の Runner はキャッシュ前提
LINE 通知の課金超過月200通の無料枠超過週次運用への切替、Multicast API への分散
PASS の誤読(買い推奨化)致命的機械フィルタが投資判断と混同される通知文末尾に「買い推奨ではない」注記、定性調査の必要性を明示

設計判断の記録:自動化パイプラインのトレードオフ

判断0:なぜこの3つの設計判断を最初に決めるか

判断1(並列化技術)はパフォーマンス、判断2(通知メディア)は到達率、判断3(スケジューラ)は永続性の3軸に対応します。これら3つを最初に固めれば、応用編 #16〜#20 の機能追加(時系列分析・罠銘柄検知・可視化など)はすべてここに乗る形で派生できます。記事 #11/#12 で導入した「設計判断の記録」セクションも、応用編全体を通じてこのパターン化された3項目で進めます。

判断1:なぜ multiprocessing で並列化するか(asyncio や Dask ではなく)

  • 採用理由: 判定処理は CPU bound なので multiprocessing が最適。asyncio は I/O bound 向け、Dask は GB 級データ向けで本記事の規模(数千行・数十秒)には過剰
  • 採用したことで失うもの: プロセス間通信のオーバーヘッド(小規模データでは threading より遅い場面もある)。具体的シナリオ: 銘柄数が500未満で I/O が重い場合は asyncio + threading のほうが速い場合もある。Phase C で銘柄数が10万級になった場合は Polars + Ray など別技術の検討余地あり

判断2:なぜ LINE 通知を最優先にするか(Slack / メールではなく)

  • 採用理由: CLAUDE.md で「LINE Messaging API 最優先」と明記。日本のユーザーが日常的にチェックする頻度・スマホへの即時性がブログ読者層と合致する
  • 採用したことで失うもの: LINE は Bot との 1on1 が前提で、複数人共有や監査ログには弱い。具体的シナリオ: 投資コミュニティで結果共有したい・チームで運用したい場合は Slack 連携が向く。日次運用にする場合は無料枠を超え月数千円の課金が発生する点も天秤

判断3:なぜ Phase A は cron、Phase B 以降は GitHub Actions か

  • 採用理由: cron は学習コスト最小・即時動作確認可能。GitHub Actions は永続稼働+Secrets 管理+ログ蓄積が標準で揃う
  • 採用したことで失うもの: GitHub Actions の月2,000分制限(プライベート無料枠)。具体的シナリオ: 日次1回30秒なら月15分で余裕、銘柄ごとに別ジョブを並列化したい場合や複数の運用ジョブを並べる場合は枠を超え、Cloud Run / AWS Lambda 等への移行検討

本業の話:全数検査の並列化で出荷スループットを5倍にした経験

筆者が製造業の開発部門で 製品検査の全数自動化を担当していたとき、ベテランからもらった指示が「1個ずつではなく、複数台を並列に検査するラインを設計しろ」でした。当時は手動で1個ずつ検査していて、1日100台しか出荷できず、製造拠点には2週間分の滞貨が積み上がり、毎週末は残業で対応している状況でした。営業からは納期延長の謝罪電話が日常茶飯事で、現場の士気は確実に下がっていました。

初稿の自動化案では「検査機を1台導入して、自動で1個ずつ流す」設計でしたが、稟議で1ヶ月戻されたのが転機でした。「人手が機械に置き換わるだけで、スループット自体は変わらない」という指摘で、5並列のセル生産方式に書き直す方向に方針転換しました。

  • 1日 100台 → 500台/日(5倍)に出荷スループット向上、滞貨は3週間でゼロに
  • 検査機の初期投資コストは2倍(5台 vs 1台 + 並列制御)だったが、人件費削減・残業ゼロ・営業の謝罪工数ゼロ・納期短縮の合計で半年で投資回収
  • 導入直後は 異常通知が1日100通以上飛んできて、3日目には誰も読まない状態に。2週間かけて閾値・通知ルールを調整し、1日10通以下に削減。「件数が多すぎたら閾値見直しのトリガー」という運用がここで確立
  • 検査ラインのソフトウェアは 「N台の検査機が独立してジョブを取得し、結果を1つの DB にまとめる」パターンで設計。これは Python の multiprocessing.Pool + 共有 DB と本質的に同じ構造
  • 検査結果の異常は担当エンジニアの社内 chatbot に通知(本業では Microsoft Teams、本記事では LINE)。「全部の結果を見る」ではなく「異常だけ通知」の発想で運用負荷を下げた

レビューでベテランから受けた指摘:

  • 並列化の前に、判定の独立性を確保しろ。判定機間でデータ共有が必要な設計は並列化のメリットが消える」
  • 通知は要件定義の段階で「誰に・いつ・何を」を決めろ。実装してから通知を後付けすると、過剰通知や見落としが起きる」
  • cron で動かすなら、失敗時の通知も cron で出せ。検査ラインが止まってることに気づかないのが最悪」

逆方向の転移:投資パイプラインの経験が本業の検査自動化を改善した

本記事の自動化を組み始めて以降、本業の検査自動化にもこの設計が逆輸入されました。具体的には、業種補正テーブル(industry_indicator_map)の発想を本業に持ち込み、製品ファミリー別の検査閾値テーブルを設計し直しました。それまではコードに直接 if 分岐していた閾値が、テーブル駆動になることで、新製品ファミリー追加時のコード変更が不要に。スペックシート × データ駆動の組み合わせは、製造業 DX と投資データパイプラインの双方で同等以上の効果を出します。本記事の応用編シリーズ全体を通じて筆者自身の「設計の引き出し」が拡張された、という副産物がありました。

本記事の設計には、これらの教訓がそのまま反映されています:

  • 判定の独立性は記事 #14 の3層分離設計(取得・統合・分析)で確保済み
  • 通知は「誰に(自分)/いつ(朝6時)/何を(PASS銘柄N件+上位10件+免責注記)」を最初から決めてスニペット4を設計
  • 失敗通知はcron MAILTO と GitHub Actions on: failureでラインが止まっても気づける運用

まとめ:高配当株の自動化は「判定の独立性 × 並列化 × 通知優先度 × 定時運用」の4要素で完成する

  • 業種補正対応の判定器(spec_sheet_judge_v2.py)で USL/LSL/range を自動切替。direction カラムを使えば新しい業種・指標を追加するときも判定コードを書き換えずに対応できる。高配当株シリーズの基盤として応用編 #16〜#20 にも継続利用
  • multiprocessing.Pool で並列化(spawn 対応の initializer パターン必須)すれば、CPU コア数倍の高速化が見込める。判定の独立性を記事 #14 の3層分離設計で保証してあるからこそ、この並列化が成立する。製造業のセル生産方式と同型の embarrassingly parallel な構造
  • LINE Messaging API(Notify 廃止後の使い方)+ cron / GitHub Actions の組み合わせで日次〜月次のバッチ自動化。「PASS 銘柄N件」を朝6時に届ける運用が、高配当株スクリーニング自動化のミニマム到達点。継続コストは週次運用なら年2万円、日次運用なら年6万円が目安。応用編 #16 の時系列分析、#17 の罠銘柄検知が乗ることで、さらに精度の高い自動化に進化していく

今日からできる3つのアクション

  1. LINE Messaging API のチャネルを発行(5分、本記事のセットアップ手順参照)し、チャネルアクセストークンと User ID を .env に保存。本記事のスニペット4を実行して、テスト通知が自分の LINE に届くことを確認します(LINE Notify 廃止後の使い方)
  2. 本記事のスニペット1〜3を順に実行し、自分の data/screening_results/{今日の日付}.parquet が生成される状態を作る。判定結果を pandas で読み込み、PASS/CAUTION/FAIL の分布を集計してみる(例: df.groupby("overall").size())。PASS 件数が「適切な抽出範囲(10〜30件)」に収まっているかも確認
  3. cron または GitHub Actions のスケジュールに本記事のスクリプトを乗せて、週次(月曜朝6時)または日次の自動運用を1週間テスト。失敗時の通知(cron MAILTO / GitHub Actions on: failure)も合わせて設定すると、運用の安心感が桁違いに上がります

次回予告:配当推移の安定性を時系列分析で評価する(応用編 Phase 3 続き)

次回(記事#16)では、本記事の判定器が CAUTION で渡していた EPS トレンド・連続増配年数・営業CF 5年プラスの3指標を、時系列分析で自動評価する実装を扱います。pandas + matplotlib で過去配当履歴を可視化し、「連続増配何年か・配当の単調性・変動係数」を計算して判定器に渡せる形にします。

  • 過去5〜10年の EPS・配当履歴を pandas で時系列化
  • 変動係数(CV)・単調増加判定・移動平均で「持続性」を定量化
  • matplotlib で可視化し、結果を v_screening_input に追加するビュー設計

「製品開発DXエンジニアの投資術」シリーズ全体像

本記事は 応用編(記事#11〜#20)の第5回 です。応用編の DX フェーズマップでの位置づけ:

前回 #14 データ統合本記事 #15 自動化パイプライン | 次回 #16(公開予定)

関連記事(基礎編から): #03 複利のPython可視化#08 リスクとリターン#09 NISA・iDeCoの設計

免責事項(再掲)

本記事は投資助言を目的としたものではなく、技術・分析手法の紹介です。コード・スクリーニング結果・継続運用コスト試算は教育目的であり、特定の銘柄・金融商品の売買を推奨するものではありません。投資判断はご自身の責任で行ってください。J-Quants API・EDINET API・LINE Messaging API・GitHub Actions の利用規約・料金プラン・機能は変更される可能性があるため、実装時は各サービスの公式ドキュメント・利用規約を必ず確認してください。本記事中に J-Quants API から取得した実データは掲載していません(利用規約に基づく方針)。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

コメント

コメントする

CAPTCHA


目次