EPUSDT 后台接入 OKX 欧易实时汇率教程
先说明
这篇教程只做一件事:把 EPUSDT 的 CNY -> USDT 汇率改成 OKX 欧易实时汇率,并且让新订单直接按实时汇率出金额。
这版 epusdt 有几个坑要先说明清楚:
- 后台
汇率 API 地址不能直接填 OKX 官方地址,epusdt 只认{"cny":{"usdt":0.14636}}这种返回格式。 - 只填
汇率 API 地址还不够。这个版本在USDT + CNY场景实际还会用强制 USDT 汇率。后台这个输入框现在通常是灰色不可编辑,这是正常的,所以要让本机服务自动同步它。 - 这版脚本会同时补齐
cny.json、usd.json和其它币种汇率,解决TRON -> TRX切换时报系统错误的问题。
下面这套做法就是同时解决这几个问题:本机服务一边提供 epusdt 能识别的 JSON,一边自动登录 epusdt 后台 API,把 rate.forced_usdt_rate 同步成 OKX 实时值。
步骤 1:创建目录
在服务器里新建目录 /opt/epusdt-okx-rate ,然后进入这个目录。
mkdir -p /opt/epusdt-okx-rate
cd /opt/epusdt-okx-rate
步骤 2:新建 server.py
#!/usr/bin/env python3
import json
import logging
import os
import re
import sqlite3
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Dict, Optional
HOST = "127.0.0.1"
PORT = 18089
CACHE_TTL_SECONDS = 10
REFRESH_INTERVAL_SECONDS = 10
EPUSDT_DB_PATH = "/www/wwwroot/epusdt/epusdt.db"
EPUSDT_ADMIN_API_BASE = os.getenv("EPUSDT_ADMIN_API_BASE", "http://127.0.0.1:8000/admin/api/v1")
EPUSDT_ADMIN_USERNAME = os.getenv("EPUSDT_ADMIN_USERNAME", "")
EPUSDT_ADMIN_PASSWORD = os.getenv("EPUSDT_ADMIN_PASSWORD", "")
USER_AGENT = (
"Mozilla/5.0 (X11; Linux x86_64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/135.0.0.0 Safari/537.36"
)
OKX_EXCHANGE_URL = "https://www.okx.com/zh-hans/exchange/usdt-to-cny"
OKX_CONVERT_URL = "https://www.okx.com/zh-hans/convert/cny-to-usdt"
OKX_QUOTED_PRICE_URL = (
"https://www.okx.com/priapi/v3/b2c/deposit/quotedPrice"
"?baseCurrency=USDT"eCurrency=CNY"
)
PUBLIC_CURRENCY_API_URL = (
"https://cdn.jsdelivr.net/npm/@fawazahmed0/currency-api@latest/v1/currencies/{base}.json"
)
PRODUCT_PRICE_RE = re.compile(r'"price":"([0-9]+(?:\.[0-9]+)?)","priceCurrency":"CNY"')
DIRECT_RATE_RE_LIST = (
re.compile(r'当前 1 CNY 可兑换 ([0-9]+(?:\.[0-9]+)?) USDT'),
re.compile(r'CNY/USDT 今天的兑换率为 ([0-9]+(?:\.[0-9]+)?) USDT'),
)
_cache_lock = threading.Lock()
_cache = {"ts": 0.0, "rate": None, "source": None}
_public_cache_lock = threading.Lock()
_public_cache: Dict[str, dict] = {}
_admin_token_lock = threading.Lock()
_admin_token: Optional[str] = None
def format_float(value: float) -> str:
return f"{value:.8f}".rstrip("0").rstrip(".")
def sync_forced_usdt_rate_via_db(value: str, source: str, cny_to_usdt_rate: float) -> None:
conn = sqlite3.connect(EPUSDT_DB_PATH, timeout=5)
try:
conn.execute("PRAGMA busy_timeout = 5000")
conn.execute(
"""
INSERT INTO settings
("group", "key", "value", "type", "description", "created_at", "updated_at", "deleted_at")
VALUES
('rate', 'rate.forced_usdt_rate', ?, 'string', '强制USDT汇率', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, NULL)
ON CONFLICT("key") DO UPDATE SET
"value" = excluded."value",
"type" = 'string',
"description" = '强制USDT汇率',
"updated_at" = CURRENT_TIMESTAMP,
"deleted_at" = NULL
""",
(value,),
)
conn.commit()
logging.warning(
"fallback synced rate.forced_usdt_rate=%s via sqlite from %s (cny->usdt=%s)",
value,
source,
format_float(cny_to_usdt_rate),
)
finally:
conn.close()
def http_get_text(url: str, timeout: int = 8) -> str:
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read().decode("utf-8", errors="ignore")
def fetch_public_currency_payload(base: str) -> dict:
payload = json.loads(http_get_text(PUBLIC_CURRENCY_API_URL.format(base=base)))
data = payload.get(base)
if not isinstance(data, dict):
raise ValueError(f"invalid currency payload for {base}")
return data
def get_public_currency_payload(base: str, force_refresh: bool = False) -> dict:
now = time.time()
with _public_cache_lock:
cached = _public_cache.get(base)
if (
cached is not None
and not force_refresh
and now - cached["ts"] < CACHE_TTL_SECONDS
):
return dict(cached["data"])
data = fetch_public_currency_payload(base)
with _public_cache_lock:
_public_cache[base] = {"ts": now, "data": dict(data)}
return dict(data)
def http_request_json(
url: str,
method: str = "GET",
payload: Optional[dict] = None,
headers: Optional[Dict[str, str]] = None,
timeout: int = 8,
) -> dict:
body = None
request_headers = {"User-Agent": USER_AGENT}
if headers:
request_headers.update(headers)
if payload is not None:
body = json.dumps(payload, ensure_ascii=True, separators=(",", ":")).encode()
request_headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=body, headers=request_headers, method=method)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
def get_admin_token(force_refresh: bool = False) -> str:
global _admin_token
if not EPUSDT_ADMIN_USERNAME or not EPUSDT_ADMIN_PASSWORD:
raise ValueError("missing EPUSDT_ADMIN_USERNAME / EPUSDT_ADMIN_PASSWORD")
with _admin_token_lock:
if _admin_token and not force_refresh:
return _admin_token
payload = {
"username": EPUSDT_ADMIN_USERNAME,
"password": EPUSDT_ADMIN_PASSWORD,
}
resp = http_request_json(
f"{EPUSDT_ADMIN_API_BASE}/auth/login",
method="POST",
payload=payload,
)
token = ((resp.get("data") or {}).get("token") or "").strip()
if not token:
raise ValueError(f"admin login failed: {resp}")
_admin_token = token
return token
def sync_forced_usdt_rate(cny_to_usdt_rate: float, source: str) -> None:
if cny_to_usdt_rate <= 0:
raise ValueError("invalid cny->usdt rate")
usdt_to_cny_rate = 1.0 / cny_to_usdt_rate
value = format_float(usdt_to_cny_rate)
payload = {
"items": [
{
"group": "rate",
"key": "rate.forced_usdt_rate",
"value": value,
"type": "string",
}
]
}
for force_refresh in (False, True):
try:
token = get_admin_token(force_refresh=force_refresh)
resp = http_request_json(
f"{EPUSDT_ADMIN_API_BASE}/settings",
method="PUT",
payload=payload,
headers={"Authorization": f"Bearer {token}"},
)
results = resp.get("data") or []
if not results or not results[0].get("ok"):
raise ValueError(f"settings upsert failed: {resp}")
logging.info(
"synced rate.forced_usdt_rate=%s via admin api from %s (cny->usdt=%s)",
value,
source,
format_float(cny_to_usdt_rate),
)
return
except Exception: # noqa: BLE001
if force_refresh:
logging.exception("sync via admin api failed, falling back to sqlite")
sync_forced_usdt_rate_via_db(value, source, cny_to_usdt_rate)
def fetch_rate_from_exchange_page() -> tuple[float, str]:
html = http_get_text(OKX_EXCHANGE_URL)
match = PRODUCT_PRICE_RE.search(html)
if match:
price_cny_per_usdt = float(match.group(1))
if price_cny_per_usdt > 0:
return round(1.0 / price_cny_per_usdt, 12), "okx-exchange-page"
for regex in DIRECT_RATE_RE_LIST:
match = regex.search(html)
if match:
return round(float(match.group(1)), 12), "okx-exchange-page-faq"
raise ValueError("failed to parse OKX exchange page")
def fetch_rate_from_convert_page() -> tuple[float, str]:
html = http_get_text(OKX_CONVERT_URL)
match = PRODUCT_PRICE_RE.search(html)
if match:
price_cny_per_usdt = float(match.group(1))
if price_cny_per_usdt > 0:
return round(1.0 / price_cny_per_usdt, 12), "okx-convert-page"
for regex in DIRECT_RATE_RE_LIST:
match = regex.search(html)
if match:
return round(float(match.group(1)), 12), "okx-convert-page-faq"
raise ValueError("failed to parse OKX convert page")
def fetch_rate_from_quote_api() -> tuple[float, str]:
payload = json.loads(http_get_text(OKX_QUOTED_PRICE_URL))
items = payload.get("data") or []
if not items:
raise ValueError("quotedPrice returned empty data")
price_cny_per_usdt = float(items[0]["price"])
if price_cny_per_usdt <= 0:
raise ValueError("quotedPrice returned invalid price")
return round(1.0 / price_cny_per_usdt, 12), "okx-quotedPrice"
def fetch_okx_cny_to_usdt_rate() -> tuple[float, str]:
errors = []
for fetcher in (
fetch_rate_from_exchange_page,
fetch_rate_from_convert_page,
fetch_rate_from_quote_api,
):
try:
return fetcher()
except Exception as exc: # noqa: BLE001
errors.append(f"{fetcher.__name__}: {exc}")
raise RuntimeError("; ".join(errors))
def get_okx_cny_to_usdt_rate(force_refresh: bool = False) -> tuple[float, str]:
now = time.time()
with _cache_lock:
if (
not force_refresh
and _cache["rate"] is not None
and now - _cache["ts"] < CACHE_TTL_SECONDS
):
return _cache["rate"], _cache["source"]
rate, source = fetch_okx_cny_to_usdt_rate()
sync_forced_usdt_rate(rate, source)
_cache["ts"] = now
_cache["rate"] = rate
_cache["source"] = source
return rate, source
def refresh_loop() -> None:
while True:
try:
get_okx_cny_to_usdt_rate(force_refresh=True)
except Exception: # noqa: BLE001
logging.exception("background refresh failed")
time.sleep(REFRESH_INTERVAL_SECONDS)
class Handler(BaseHTTPRequestHandler):
server_version = "epusdt-okx-rate/1.0"
def do_GET(self) -> None: # noqa: N802
parsed = urllib.parse.urlparse(self.path)
force_refresh = urllib.parse.parse_qs(parsed.query).get("refresh") == ["1"]
if parsed.path == "/healthz":
self.write_json(200, {"ok": True})
return
if parsed.path == "/":
self.write_json(
200,
{
"service": "epusdt okx rate adapter",
"usage": {
"cny": "GET /cny.json",
"healthz": "GET /healthz",
},
},
)
return
if not parsed.path.endswith(".json"):
self.write_json(404, {"error": "not found"})
return
base = parsed.path.rsplit("/", 1)[-1][:-5].strip().lower()
if not base:
self.write_json(404, {"error": "not found"})
return
try:
if base == "cny":
rate, source = get_okx_cny_to_usdt_rate(force_refresh=force_refresh)
data = get_public_currency_payload("cny", force_refresh=force_refresh)
# Keep USDT pegged to the OKX-derived rate used by epusdt.
data["usdt"] = rate
self.write_json(200, {"cny": data}, {"X-Rate-Source": source})
return
if base == "usd":
data = get_public_currency_payload("usd", force_refresh=force_refresh)
data["usdt"] = 1
self.write_json(200, {"usd": data})
return
self.write_json(200, {base: get_public_currency_payload(base, force_refresh=force_refresh)})
except Exception as exc: # noqa: BLE001
logging.exception("failed to serve %s", parsed.path)
self.write_json(500, {"error": str(exc)})
def log_message(self, fmt: str, *args) -> None:
logging.info("%s - %s", self.address_string(), fmt % args)
def write_json(
self, status: int, payload: dict, extra_headers: Optional[Dict[str, str]] = None
) -> None:
body = json.dumps(payload, ensure_ascii=True, separators=(",", ":")).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.send_header("Cache-Control", "no-store")
if extra_headers:
for key, value in extra_headers.items():
self.send_header(key, value)
self.end_headers()
self.wfile.write(body)
def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
threading.Thread(target=refresh_loop, daemon=True).start()
server = ThreadingHTTPServer((HOST, PORT), Handler)
logging.info("listening on http://%s:%s", HOST, PORT)
server.serve_forever()
if __name__ == "__main__":
main()
步骤 3:新建 systemd 服务文件
新建文件 /etc/systemd/system/epusdt-okx-rate.service ,把下面内容整段复制进去保存。
[Unit]
Description=EPUSDT OKX Rate Adapter
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
WorkingDirectory=/opt/epusdt-okx-rate
Environment=PYTHONUNBUFFERED=1
Environment=EPUSDT_ADMIN_API_BASE=http://127.0.0.1:8000/admin/api/v1
Environment=EPUSDT_ADMIN_USERNAME=你的后台账号
Environment=EPUSDT_ADMIN_PASSWORD=你的后台密码
ExecStart=/usr/bin/python3 /opt/epusdt-okx-rate/server.py
Restart=always
RestartSec=2
[Install]
WantedBy=multi-user.target
EPUSDT_ADMIN_API_BASE 填 epusdt 程序本机实际监听的后台 API 地址,常见是 http://127.0.0.1:8000/admin/api/v1 。如果你的 epusdt 不是这个端口,就改成你自己的本机地址。
EPUSDT_ADMIN_USERNAME 和 EPUSDT_ADMIN_PASSWORD 改成你自己的 epusdt 后台登录账号密码。
步骤 4:启动服务并先检查自动同步
文件保存好后,再执行下面命令。
systemctl daemon-reload
systemctl enable --now epusdt-okx-rate.service
curl http://127.0.0.1:18089/cny.json
curl http://127.0.0.1:18089/trx.json
journalctl -u epusdt-okx-rate.service -n 20 --no-pager
这里必须同时满足下面几个结果再继续:
curl http://127.0.0.1:18089/cny.json要返回包含usdt的 JSONcurl http://127.0.0.1:18089/trx.json不能是空对象- 日志里要看到
synced rate.forced_usdt_rate=...,最好是带via admin api
如果日志里只有 fallback synced ... via sqlite 或者出现 admin login failed ,先检查上一步的后台 API 地址、账号、密码,不要继续往下测。
步骤 5:进入 epusdt 后台填写汇率地址
登录 epusdt 后台,打开 系统配置 -> 汇率配置 ,把 汇率 API 地址 改成 http://127.0.0.1:18089/ 并保存。

新版 epusdt 后台已经支持手动填写 强制 USDT 汇率 。如果你看到这个框可编辑,这是正常的;如果仍然是灰色,通常是旧版本或前端缓存未更新。
但如果你是按本教程部署这套本机服务,就不需要手动长期维护这个值,也不要手改数据库。上面的本机服务会通过后台 API 自动同步;如果你手动填了固定值,后续也可能被自动同步覆盖。
不要把 OKX 官方接口原地址直接填到 汇率 API 地址 这里,这个版本的 epusdt 识别不了官方返回格式。
步骤 6:确认新订单已经按实时汇率出金额
正常情况下,这套做法不需要手动反复改 强制 USDT 汇率 ,也不用再靠重启 epusdt 去吃数据库值。前面的服务会直接通过后台 API 同步运行中的汇率配置。
最后按下面规则检查:
- 重新新建一笔订单测试,不要看旧订单。旧订单金额不会自动变。
- 订单金额要能跟着当前 OKX 实时汇率变化。
- 如果刚好有未支付旧单占用了同一个金额,epusdt 会自动加
0.01防撞单,这是正常现象,不是汇率错了。 - 切到
TRON -> TRX不应再报系统错误。
如果你按完上面步骤后,日志里已经出现 via admin api , curl http://127.0.0.1:18089/trx.json 也不是空对象,并且新订单金额能跟着 OKX 实时变化,这篇教程就完成了。
文章作者:大神K
版权说明:本文为原创内容,转载请注明出处。