一、这篇文档解决什么问题
LemonPicking 支持通过本地 REST API 把第三方数据源拉到的 A 股数据写入本机数据库,从而不必完全依赖 Tushare 等付费接口。
此篇文档说明如何使用 AKShare 拉取 A 股列表、日线与部分估值指标,并调用 LemonPicking 的本地接口完成入库。
二、你需要准备什么
- 已安装并可正常运行的 LemonPicking 客户端(与本脚本在同一台电脑或能访问该电脑
127.0.0.1的网络环境)。 - Python 3.8+(建议与 LemonPicking 所用版本接近)。
- 安装依赖:
pip install akshare
- 在 LemonPicking 中先开启「API 同步服务」(见下一节),确认端口与(可选)访问 Token 与脚本参数一致。
三、与 LemonPicking 如何配合
1. 在软件里启动本地 API
- 打开 LemonPicking,进入 「数据同步」。
- 切换到 「API 同步服务」 标签页。
- 设置 端口(默认常见为
5010,以你界面显示为准)。 - (强烈建议)设置 访问 Token:在「访问 Token」中填写一串足够长的随机字符串;脚本侧用同一字符串通过 HTTP 头发送。
- 点击 「开启本地 API 服务」。
服务监听在http://127.0.0.1:<端口>,仅本机访问时可显著降低被局域网或恶意脚本误调用的风险。
2. 脚本如何连上 LemonPicking
脚本通过 --base-url 指定 API 根地址,例如:
http://127.0.0.1:5010
若你在软件里启用了 Token,脚本需增加:
–token “你在软件里设置的同一串 Token”
请求会自动带上请求头:Authorization: Bearer <token>。
3. 脚本会调用哪些接口(与软件内文档一致)
| 步骤 | 说明 | 对应路径(相对 base-url) |
|---|---|---|
| 股票列表 | 写入 symbols | POST /api/local-sync/symbols |
| 日 K 线 | 写入 ohlcv_data | POST /api/local-sync/ohlcv-daily |
| 基本面/估值快照 | 写入 daily_valuation | POST /api/local-sync/fundamentals |
四、脚本命令行参数说明
| 参数 | 默认值 | 含义 |
|---|---|---|
--base-url | http://127.0.0.1:5010 | LemonPicking 本地 API 根地址 |
--days | 180 | 日 K 回溯天数(从今天往前) |
--limit-symbols | 0 | 只同步前 N 只股票,0 表示全部 |
--batch-size | 500 | 每批提交条数(日 K 为分批抓取即写入) |
--token | 空 | 与软件内一致的 Bearer Token |
--dry-run | 关闭 | 只抓取/统计,不调用写入接口 |
--failed-output | failed_codes_akshare.json | 失败明细 JSON 的路径前缀;日 K 会生成 *_ohlcv.json,基本面会生成 *_fundamentals.json |
--skip-symbols | – | 跳过股票列表同步 |
--skip-ohlcv | – | 跳过日 K |
--skip-fundamentals | – | 跳过基本面/估值写入 |
五、推荐使用流程
1. 先小规模试跑(推荐)
确认 API 已开启后,限制股票数量、缩短回溯天数:
python sync_from_akshare.py --base-url http://127.0.0.1:5010 --token "你的Token" --limit-symbols 20 --days 30 --batch-size 200
2. 演练不写库(dry-run)
python sync_from_akshare.py --dry-run --limit-symbols 50
用于检查网络与 AKShare 是否可用,以及大概数据量。
3. 全量同步(耗时较长)
python src/scripts/sync_from_akshare.py --base-url http://127.0.0.1:5010 --token "你的Token" --days 365 --batch-size 500
4. 只同步某几类数据
- 只要日 K,不要列表和基本面:
python sync_from_akshare.py --skip-symbols --skip-fundamentals --token "你的Token"
- 只要基本面(列表与日 K 已有):
python sync_from_akshare.py --skip-symbols --skip-ohlcv --token "你的Token"
六、完整脚本内容
import argparse
import json
import os
import time
from datetime import datetime, timedelta
from typing import Dict, Iterable, List
from urllib import request as urlrequest
import akshare as ak
def post_json(url: str, payload: Dict, retries: int = 3, timeout: int = 60, token: str = "") -> Dict:
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
req = urlrequest.Request(
url=url,
data=data,
headers=headers,
method="POST",
)
last_error = None
for i in range(retries):
try:
with urlrequest.urlopen(req, timeout=timeout) as resp:
body = resp.read().decode("utf-8")
return json.loads(body) if body else {"ok": True}
except Exception as e:
last_error = e
time.sleep(min(3 + i * 2, 8))
raise RuntimeError(f"POST {url} 失败: {last_error}")
def chunks(items: List[Dict], size: int) -> Iterable[List[Dict]]:
for i in range(0, len(items), size):
yield items[i : i + size]
def to_ts_code(code: str) -> str:
c = str(code).strip()
if "." in c:
return c.upper()
if c.startswith(("5", "6", "9")):
return f"{c}.SH"
return f"{c}.SZ"
def sync_symbols(base_url: str, batch_size: int, token: str = "", dry_run: bool = False):
df = ak.stock_info_a_code_name()
rows = []
for _, row in df.iterrows():
code = str(row["code"]).strip()
name = str(row["name"]).strip()
rows.append(
{
"ts_code": to_ts_code(code),
"symbol": code,
"name": name,
"list_status": "L",
"exchange": "SSE" if code.startswith(("5", "6", "9")) else "SZSE",
}
)
if dry_run:
print(f"[AKShare][DRY-RUN] symbols 预览条数: {len(rows)}")
return
total = 0
for part in chunks(rows, batch_size):
r = post_json(f"{base_url}/api/local-sync/symbols", {"symbols": part}, token=token)
total += int(r.get("saved", 0))
print(f"[AKShare] symbols 同步完成: {total}")
def sync_ohlcv(
base_url: str,
days: int,
limit_symbols: int,
batch_size: int,
token: str = "",
dry_run: bool = False,
failed_output: str = "",
):
code_df = ak.stock_info_a_code_name()
symbols = [str(x).strip() for x in code_df["code"].tolist()]
if limit_symbols > 0:
symbols = symbols[:limit_symbols]
start_date = (datetime.now() - timedelta(days=days)).strftime("%Y%m%d")
end_date = datetime.now().strftime("%Y%m%d")
pending_records: List[Dict] = []
failed_codes: List[Dict] = []
saved = 0
for idx, code in enumerate(symbols, 1):
try:
hist = ak.stock_zh_a_hist(
symbol=code, period="daily", start_date=start_date, end_date=end_date, adjust=""
)
if hist is None or hist.empty:
continue
ts_code = to_ts_code(code)
for _, row in hist.iterrows():
pending_records.append(
{
"ts_code": ts_code,
"trade_date": str(row["日期"]).replace("-", ""),
"open": float(row["开盘"]),
"high": float(row["最高"]),
"low": float(row["最低"]),
"close": float(row["收盘"]),
"vol": float(row["成交量"]) / 100.0, # AKShare常见为股,转手
"amount": float(row["成交额"]) / 1000.0, # 元 -> 千元
"pct_chg": float(row["涨跌幅"]),
"change": float(row["涨跌额"]),
"turnover_rate": float(row["换手率"]),
}
)
if len(pending_records) >= batch_size:
if dry_run:
saved += len(pending_records)
pending_records.clear()
else:
try:
r = post_json(
f"{base_url}/api/local-sync/ohlcv-daily",
{"records": pending_records},
timeout=120,
token=token,
)
saved += int(r.get("saved", 0))
pending_records.clear()
except Exception as ex:
failed_codes.append({"code": code, "stage": "send_ohlcv_batch", "error": str(ex)})
pending_records.clear()
except Exception as e:
failed_codes.append({"code": code, "stage": "fetch_ohlcv", "error": str(e)})
print(f"[AKShare] 跳过 {code}: {e}")
if idx % 50 == 0:
print(f"[AKShare] 已抓取并写入 {idx}/{len(symbols)} 只股票,当前已入库: {saved}")
if pending_records:
if dry_run:
saved += len(pending_records)
else:
try:
r = post_json(
f"{base_url}/api/local-sync/ohlcv-daily",
{"records": pending_records},
timeout=120,
token=token,
)
saved += int(r.get("saved", 0))
except Exception as ex:
failed_codes.append({"code": "BATCH_FLUSH", "stage": "send_ohlcv_flush", "error": str(ex)})
mode = "DRY-RUN 预估" if dry_run else "同步"
print(f"[AKShare] ohlcv {mode}完成: {saved}")
if failed_codes:
out = failed_output or "failed_codes_akshare_ohlcv.json"
with open(out, "w", encoding="utf-8") as f:
json.dump(failed_codes, f, ensure_ascii=False, indent=2)
print(f"[AKShare] ohlcv失败明细已写入: {os.path.abspath(out)} ({len(failed_codes)}条)")
def sync_fundamentals(
base_url: str,
limit_symbols: int,
batch_size: int,
token: str = "",
dry_run: bool = False,
failed_output: str = "",
):
code_df = ak.stock_info_a_code_name()
symbols = [str(x).strip() for x in code_df["code"].tolist()]
if limit_symbols > 0:
symbols = symbols[:limit_symbols]
records: List[Dict] = []
failed_codes: List[Dict] = []
for code in symbols:
try:
ts_code = to_ts_code(code)
# 乐咕乐股估值口径,可作为daily_valuation补充来源
ind = ak.stock_a_indicator_lg(symbol=code)
if ind is None or ind.empty:
continue
latest = ind.iloc[-1]
trade_date = str(latest["trade_date"]).replace("-", "")
records.append(
{
"ts_code": ts_code,
"trade_date": trade_date,
"close": float(latest.get("close", 0) or 0),
"pe": _to_float(latest.get("pe")),
"pe_ttm": _to_float(latest.get("pe_ttm")),
"pb": _to_float(latest.get("pb")),
"ps": _to_float(latest.get("ps")),
"ps_ttm": _to_float(latest.get("ps_ttm")),
"dv_ratio": _to_float(latest.get("dv_ratio")),
"dv_ttm": _to_float(latest.get("dv_ttm")),
"total_mv": _to_float(latest.get("total_mv")),
"circ_mv": _to_float(latest.get("circ_mv")),
}
)
except Exception as ex:
failed_codes.append({"code": code, "stage": "fetch_fundamentals", "error": str(ex)})
continue
if dry_run:
print(f"[AKShare][DRY-RUN] fundamentals 预览条数: {len(records)}")
return
saved = 0
for part in chunks(records, batch_size):
r = post_json(f"{base_url}/api/local-sync/fundamentals", {"records": part}, token=token)
saved += int(r.get("saved", 0))
print(f"[AKShare] fundamentals 同步完成: {saved}")
if failed_codes:
out = failed_output or "failed_codes_akshare_fundamentals.json"
with open(out, "w", encoding="utf-8") as f:
json.dump(failed_codes, f, ensure_ascii=False, indent=2)
print(f"[AKShare] fundamentals失败明细已写入: {os.path.abspath(out)} ({len(failed_codes)}条)")
def _to_float(v):
try:
if v is None or str(v).strip() == "":
return None
return float(v)
except Exception:
return None
def main():
parser = argparse.ArgumentParser(description="使用 AKShare 同步 A 股数据到 LemonPicking 本地 API")
parser.add_argument("--base-url", default="http://127.0.0.1:5010", help="本地 API 地址")
parser.add_argument("--days", type=int, default=180, help="OHLCV 同步天数")
parser.add_argument("--limit-symbols", type=int, default=0, help="限制同步股票数,0=全部")
parser.add_argument("--batch-size", type=int, default=500, help="接口批量提交大小")
parser.add_argument("--token", default="", help="本地 API Bearer Token(如启用了访问Token)")
parser.add_argument("--dry-run", action="store_true", help="仅抓取与转换,不写入本地API")
parser.add_argument("--failed-output", default="failed_codes_akshare.json", help="失败代码输出JSON路径(含后缀前缀)")
parser.add_argument("--skip-symbols", action="store_true", help="跳过股票列表同步")
parser.add_argument("--skip-ohlcv", action="store_true", help="跳过日K同步")
parser.add_argument("--skip-fundamentals", action="store_true", help="跳过基本面同步")
args = parser.parse_args()
print(f"开始同步: base_url={args.base_url}")
if not args.skip_symbols:
sync_symbols(args.base_url, args.batch_size, token=args.token, dry_run=args.dry_run)
if not args.skip_ohlcv:
sync_ohlcv(
args.base_url,
args.days,
args.limit_symbols,
args.batch_size,
token=args.token,
dry_run=args.dry_run,
failed_output=args.failed_output.replace(".json", "_ohlcv.json"),
)
if not args.skip_fundamentals:
sync_fundamentals(
args.base_url,
args.limit_symbols,
args.batch_size,
token=args.token,
dry_run=args.dry_run,
failed_output=args.failed_output.replace(".json", "_fundamentals.json"),
)
print("同步完成")
if __name__ == "__main__":
main()