重要说明:为何更推荐 AKShare

BaoStock 只可获取「股票列表」和「日 K 线」,不包含写入基本面与估值指标接口。
若你需要在 LemonPicking 里使用基本面指标做过滤,建议优先使用 AKShare(详见 使用 AKShare 将 A 股数据同步到 LemonPicking)。

BaoStock 仍适合:环境轻量、仅需行情与标的列表、或作为 AKShare 不可用时的备选。

一、这篇文档解决什么问题

文档说明如何使用 BaoStock 拉取 A 股全市场代码与日 K,并调用 LemonPicking 本地 REST API 写入本机数据库。

二、你需要准备什么

  1. LemonPicking 客户端 可正常运行,并已在本机开启 API 同步服务(端口、Token 与脚本一致)。
  2. Python 3.8+。
  3. 安装依赖:
pip install baostock
  1. 脚本运行前会执行 baostock.login(),结束时会 logout(),需保证本机能访问 BaoStock 服务。

三、与 LemonPicking 如何配合

1. 在软件里启动本地 API

  1. 打开 LemonPicking → 数据同步 → API 同步服务。
  2. 设置 端口(例如 5010,以界面为准)。
  3. (强烈建议)设置 访问 Token,脚本使用 --token 传入相同字符串。
  4. 点击 开启本地 API 服务。根地址一般为:http://127.0.0.1:<端口>

2. 脚本调用的接口

步骤写入表路径
股票列表symbolsPOST /api/local-sync/symbols
日 Kohlcv_dataPOST /api/local-sync/ohlcv-daily

不调用 POST /api/local-sync/fundamentals(BaoStock 侧无对应完整指标源)。

若启用 Token,请求头为:Authorization: Bearer <token>

四、命令行参数说明

参数默认值含义
--base-urlhttp://127.0.0.1:5010本地 API 根地址
--days180日 K 回溯天数
--limit-symbols0只处理前 N 只标的,0 为全部
--batch-size500批量提交大小;日 K 满批即写入
--trade-date当天 YYYY-MM-DD拉取股票列表时使用的交易日(query_all_stock
--tokenBearer Token
--dry-run关闭不调用写入接口,仅统计/演练
--failed-outputfailed_codes_baostock.json失败明细 JSON 路径
--skip-symbols跳过列表同步
--skip-ohlcv跳过日 K

五、推荐使用流程

1. 小规模试跑

python sync_from_baostock.py ^

--base-url http://127.0.0.1:5010 ^

--token "你的Token" ^

--limit-symbols 30 ^

--days 60 ^

--batch-size 200

2. 演练(不写库)

python sync_from_baostock.py --dry-run --limit-symbols 50

3. 仅同步日 K(列表已在库)

python sync_from_baostock.py --skip-symbols --token "你的Token"

4. 股票列表与交易日

列表来自 query_all_stock(day=trade_date)。若当天非交易日或返回异常,可将 --trade-date 改为最近一个交易日,例如:

python sync_from_baostock.py --trade-date 2026-03-28 --token "你的Token"

六、完整脚本

import argparse
import json
import os
import time
from datetime import datetime, timedelta
from typing import Dict, Iterable, List
from urllib import request as urlrequest

import baostock as bs


def post_json(url: str, payload: Dict, retries: int = 3, timeout: int = 60, token: str = "") -> Dict:
    data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
    headers = {"Content-Type": "application/json"}
    if token:
        headers["Authorization"] = f"Bearer {token}"
    req = urlrequest.Request(
        url=url,
        data=data,
        headers=headers,
        method="POST",
    )
    last_error = None
    for i in range(retries):
        try:
            with urlrequest.urlopen(req, timeout=timeout) as resp:
                body = resp.read().decode("utf-8")
                return json.loads(body) if body else {"ok": True}
        except Exception as e:
            last_error = e
            time.sleep(min(3 + i * 2, 8))
    raise RuntimeError(f"POST {url} 失败: {last_error}")


def chunks(items: List[Dict], size: int) -> Iterable[List[Dict]]:
    for i in range(0, len(items), size):
        yield items[i : i + size]


def to_ts_code(bao_code: str) -> str:
    # baostock: sh.600000 / sz.000001 / bj.920992
    c = str(bao_code).strip().lower()
    if "." not in c:
        return c
    ex, code = c.split(".", 1)
    suffix = {"sh": "SH", "sz": "SZ", "bj": "BJ"}.get(ex, ex.upper())
    return f"{code}.{suffix}"


def to_symbol(bao_code: str) -> str:
    c = str(bao_code).strip().lower()
    if "." not in c:
        return c
    return c.split(".", 1)[1]


def sync_symbols(base_url: str, trade_date: str, batch_size: int, token: str = "", dry_run: bool = False):
    rs = bs.query_all_stock(day=trade_date)
    rows = []
    while rs.error_code == "0" and rs.next():
        row = rs.get_row_data()
        code = row[0]
        name = row[2] if len(row) > 2 else ""
        rows.append(
            {
                "ts_code": to_ts_code(code),
                "symbol": to_symbol(code),
                "name": name or to_symbol(code),
                "list_status": "L",
                "exchange": code.split(".", 1)[0].upper(),
            }
        )
    if dry_run:
        print(f"[BaoStock][DRY-RUN] symbols 预览条数: {len(rows)}")
        return
    total = 0
    for part in chunks(rows, batch_size):
        r = post_json(f"{base_url}/api/local-sync/symbols", {"symbols": part}, token=token)
        total += int(r.get("saved", 0))
    print(f"[BaoStock] symbols 同步完成: {total}")


def sync_ohlcv(
    base_url: str,
    days: int,
    limit_symbols: int,
    batch_size: int,
    token: str = "",
    dry_run: bool = False,
    failed_output: str = "",
):
    start_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
    end_date = datetime.now().strftime("%Y-%m-%d")
    day = datetime.now().strftime("%Y-%m-%d")

    rs = bs.query_all_stock(day=day)
    all_codes = []
    while rs.error_code == "0" and rs.next():
        all_codes.append(rs.get_row_data()[0])
    if limit_symbols > 0:
        all_codes = all_codes[:limit_symbols]

    pending_records: List[Dict] = []
    saved = 0
    failed_codes: List[Dict] = []
    for idx, code in enumerate(all_codes, 1):
        try:
            q = bs.query_history_k_data_plus(
                code,
                "date,open,high,low,close,preclose,volume,amount,turn,pctChg",
                start_date=start_date,
                end_date=end_date,
                frequency="d",
                adjustflag="3",
            )
            if q.error_code != "0":
                failed_codes.append({"code": code, "stage": "fetch", "error": q.error_msg})
                continue
            while q.error_code == "0" and q.next():
                r = q.get_row_data()
                pending_records.append(
                    {
                        "ts_code": to_ts_code(code),
                        "trade_date": r[0].replace("-", ""),
                        "open": float(r[1]),
                        "high": float(r[2]),
                        "low": float(r[3]),
                        "close": float(r[4]),
                        "pre_close": float(r[5]) if r[5] else None,
                        "volume": float(r[6]) if r[6] else 0.0,
                        "amount": (float(r[7]) / 1000.0) if r[7] else None,
                        "turnover_rate": float(r[8]) if r[8] else None,
                        "pct_chg": float(r[9]) if r[9] else None,
                    }
                )
                if len(pending_records) >= batch_size:
                    if dry_run:
                        saved += len(pending_records)
                        pending_records.clear()
                    else:
                        r = post_json(
                            f"{base_url}/api/local-sync/ohlcv-daily",
                            {"records": pending_records},
                            timeout=120,
                            token=token,
                        )
                        saved += int(r.get("saved", 0))
                        pending_records.clear()
        except Exception as ex:
            failed_codes.append({"code": code, "stage": "fetch_or_send", "error": str(ex)})
        if idx % 50 == 0:
            print(f"[BaoStock] 已抓取并写入 {idx}/{len(all_codes)} 只股票,当前已入库: {saved}")

    if pending_records:
        if dry_run:
            saved += len(pending_records)
        else:
            r = post_json(
                f"{base_url}/api/local-sync/ohlcv-daily",
                {"records": pending_records},
                timeout=120,
                token=token,
            )
            saved += int(r.get("saved", 0))
    mode = "DRY-RUN 预估" if dry_run else "同步"
    print(f"[BaoStock] ohlcv {mode}完成: {saved}")
    if failed_codes:
        out = failed_output or "failed_codes_baostock.json"
        with open(out, "w", encoding="utf-8") as f:
            json.dump(failed_codes, f, ensure_ascii=False, indent=2)
        print(f"[BaoStock] 失败股票明细已写入: {os.path.abspath(out)} ({len(failed_codes)}条)")


def main():
    parser = argparse.ArgumentParser(description="使用 BaoStock 同步 A 股数据到 LemonPicking 本地 API")
    parser.add_argument("--base-url", default="http://127.0.0.1:5010", help="本地 API 地址")
    parser.add_argument("--days", type=int, default=180, help="OHLCV 同步天数")
    parser.add_argument("--limit-symbols", type=int, default=0, help="限制同步股票数,0=全部")
    parser.add_argument("--batch-size", type=int, default=500, help="接口批量提交大小")
    parser.add_argument("--trade-date", default=datetime.now().strftime("%Y-%m-%d"), help="拉取股票列表用交易日")
    parser.add_argument("--token", default="", help="本地 API Bearer Token(如启用了访问Token)")
    parser.add_argument("--dry-run", action="store_true", help="仅抓取与转换,不写入本地API")
    parser.add_argument("--failed-output", default="failed_codes_baostock.json", help="失败代码输出JSON路径")
    parser.add_argument("--skip-symbols", action="store_true", help="跳过股票列表同步")
    parser.add_argument("--skip-ohlcv", action="store_true", help="跳过日K同步")
    args = parser.parse_args()

    lg = bs.login()
    if lg.error_code != "0":
        raise RuntimeError(f"baostock 登录失败: {lg.error_msg}")

    try:
        print(f"开始同步: base_url={args.base_url}")
        if not args.skip_symbols:
            sync_symbols(args.base_url, args.trade_date, args.batch_size, token=args.token, dry_run=args.dry_run)
        if not args.skip_ohlcv:
            sync_ohlcv(
                args.base_url,
                args.days,
                args.limit_symbols,
                args.batch_size,
                token=args.token,
                dry_run=args.dry_run,
                failed_output=args.failed_output,
            )
        print("同步完成")
    finally:
        bs.logout()


if __name__ == "__main__":
    main()