Files
shusub2/sub2api_quota_tui.py
T
2026-06-09 15:06:26 +08:00

439 lines
16 KiB
Python

#!/usr/bin/env python3
"""Textual TUI for Sub2API quota and daily account usage."""
from __future__ import annotations
import argparse
import datetime as dt
import json
import os
import sys
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
DEFAULT_API_URL = "http://127.0.0.1:18318/api/tui/accounts"
DEFAULT_CONFIG_FILE = "~/.config/shusub2/api-url"
DEFAULT_REFRESH_SECONDS = 60
DEFAULT_TIMEOUT_SECONDS = 10
def env_int(name: str, default: int, *, minimum: int = 1) -> int:
try:
return max(minimum, int(os.environ.get(name, default)))
except Exception:
return default
def default_api_url() -> str:
for name in ("SHUSUB2_API_URL", "SUB2API_QUOTA_TUI_API_URL"):
value = os.environ.get(name, "").strip()
if value:
return value
config_file = Path(os.environ.get("SHUSUB2_API_URL_FILE", DEFAULT_CONFIG_FILE)).expanduser()
try:
for line in config_file.read_text(encoding="utf-8").splitlines():
value = line.strip()
if value and not value.startswith("#"):
return value
except OSError:
pass
return DEFAULT_API_URL
def as_float(value: Any) -> float:
try:
if value is None or str(value).strip() == "":
return 0.0
return float(value)
except Exception:
return 0.0
def as_int(value: Any) -> int:
try:
if value is None or str(value).strip() == "":
return 0
return int(float(value))
except Exception:
return 0
def format_cost(value: Any) -> str:
amount = as_float(value)
if amount == 0:
return "$0"
if abs(amount) < 0.01:
return f"${amount:.6f}".rstrip("0").rstrip(".")
if abs(amount) < 10:
return f"${amount:.3f}".rstrip("0").rstrip(".")
return f"${amount:.2f}".rstrip("0").rstrip(".")
def format_count(value: Any) -> str:
number = as_int(value)
if abs(number) >= 1_000_000:
return f"{number / 1_000_000:.1f}M".rstrip("0").rstrip(".")
if abs(number) >= 1_000:
return f"{number / 1_000:.1f}K".rstrip("0").rstrip(".")
return str(number)
def format_percent(value: Any) -> str:
if value is None or str(value).strip() == "":
return "-"
percent = as_float(value)
rounded = round(percent, 1)
if rounded.is_integer():
return f"{int(rounded)}%"
return f"{rounded}%"
def parse_time(value: Any) -> dt.datetime | None:
text = str(value or "").strip()
if not text:
return None
try:
normalized = text[:-1] + "+00:00" if text.endswith("Z") else text
return dt.datetime.fromisoformat(normalized)
except Exception:
return None
def short_time(value: Any) -> str:
parsed = parse_time(value)
if not parsed:
return "-"
return parsed.astimezone().strftime("%m-%d %H:%M")
def add_refresh_param(api_url: str, refresh: bool) -> str:
if not refresh:
return api_url
parsed = urllib.parse.urlparse(api_url)
query = urllib.parse.parse_qsl(parsed.query, keep_blank_values=True)
query = [(key, value) for key, value in query if key != "refresh"]
query.append(("refresh", "1"))
return urllib.parse.urlunparse(parsed._replace(query=urllib.parse.urlencode(query)))
def fetch_payload(api_url: str, timeout: int, *, refresh: bool = False) -> dict[str, Any]:
req = urllib.request.Request(add_refresh_param(api_url, refresh), headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=timeout) as response:
data = json.loads(response.read().decode("utf-8"))
if not isinstance(data, dict):
raise RuntimeError("API did not return a JSON object")
return data
def kind_label(kind: Any) -> str:
return {
"quota_limited": "quota",
"pending_quota": "pending",
"daily_limited": "daily",
"pay_as_you_go": "usage",
"disabled": "disabled",
}.get(str(kind or ""), str(kind or "unknown"))
def provider_label(account: dict[str, Any]) -> str:
text = str(account.get("provider") or account.get("platform") or "").strip().lower()
if not text:
return "-"
if "anthropic" in text or "claude" in text:
return "anthropic"
if "openai" in text or "chatgpt" in text:
return "openai"
return text
def routing_group_sort_rank(value: Any) -> int:
return {"sfast": 0, "fast": 1, "slow": 2, "id": 3, "": 4, "-": 4}.get(str(value or "").strip().lower(), 4)
def window_for(account: dict[str, Any], window_id: str) -> dict[str, Any]:
for window in account.get("windows") or []:
if isinstance(window, dict) and window.get("id") == window_id:
return window
return {}
def window_cell(account: dict[str, Any], window_id: str) -> str:
window = window_for(account, window_id)
if not window or window.get("used_percent") is None:
return "-"
return f"{format_percent(window.get('used_percent'))}/{format_percent(window.get('remaining_percent'))}"
def daily_quota_cell(account: dict[str, Any]) -> str:
limit = account.get("daily_quota_limit")
used = account.get("daily_quota_used")
if limit is None and used is None:
return "-"
if limit is None:
return str(used)
return f"{used or 0}/{limit}"
def reset_cell(account: dict[str, Any]) -> str:
daily_reset = short_time(account.get("daily_quota_reset_at"))
if daily_reset != "-":
return daily_reset
five_hour = window_for(account, "five-hour")
weekly = window_for(account, "weekly")
for window in (five_hour, weekly):
reset = short_time(window.get("reset"))
if reset != "-":
return reset
return "-"
def normalize_account_rows(payload: dict[str, Any], filter_text: str = "") -> list[dict[str, Any]]:
needle = filter_text.strip().lower()
rows = []
for account in payload.get("accounts") or []:
if not isinstance(account, dict):
continue
haystack = " ".join(
str(account.get(key) or "")
for key in (
"id",
"name",
"routing_group",
"provider",
"kind",
"status",
"account_type",
"platform",
"plan",
"daily_quota_timezone",
)
).lower()
if needle and needle not in haystack:
continue
routing_group = str(account.get("routing_group") or "-")
rows.append(
{
"id": as_int(account.get("id")),
"name": str(account.get("name") or ""),
"routing_group": routing_group,
"provider": provider_label(account),
"kind": str(account.get("kind") or "unknown"),
"kind_label": kind_label(account.get("kind")),
"status": str(account.get("status") or ""),
"account_type": str(account.get("account_type") or ""),
"plan": str(account.get("plan") or ""),
"priority": account.get("priority"),
"today_cost_usd": as_float(account.get("today_cost_usd")),
"today_actual_cost_usd": as_float(account.get("today_actual_cost_usd")),
"today_tokens": as_int(account.get("today_tokens")),
"today_requests": as_int(account.get("today_requests")),
"daily_quota": account.get("daily_quota") if isinstance(account.get("daily_quota"), dict) else {},
"daily_quota_limit": account.get("daily_quota_limit"),
"daily_quota_used": account.get("daily_quota_used"),
"daily_quota_remaining": account.get("daily_quota_remaining"),
"daily_quota_used_percent": account.get("daily_quota_used_percent"),
"daily_quota_reset_at": account.get("daily_quota_reset_at"),
"daily_quota_cell": daily_quota_cell(account),
"quota_used_percent_max": account.get("quota_used_percent_max"),
"five_hour": window_cell(account, "five-hour"),
"weekly": window_cell(account, "weekly"),
"reset": reset_cell(account),
"latest_usage_at": short_time(account.get("latest_usage_at") or account.get("last_used_at")),
"error": str(account.get("error") or ""),
"raw": account,
}
)
rows.sort(
key=lambda item: (
routing_group_sort_rank(item["routing_group"]),
1 if item["kind"] == "disabled" else 0,
-as_float(item["today_cost_usd"]),
-as_int(item["today_tokens"]),
-as_float(item["daily_quota_used_percent"]),
-as_float(item["quota_used_percent_max"]),
str(item["name"]).lower(),
)
)
return rows
def summary_line(payload: dict[str, Any]) -> str:
totals = payload.get("totals") if isinstance(payload.get("totals"), dict) else {}
generated = short_time(payload.get("generated_local") or payload.get("generated_at"))
cached = "cached" if payload.get("cached") else "live"
return (
f"{generated} {cached} | "
f"{as_int(totals.get('usable_accounts'))}/{as_int(totals.get('total_accounts'))} usable | "
f"today {format_cost(totals.get('today_cost_usd'))} | "
f"{format_count(totals.get('today_tokens'))} tokens | "
f"{format_count(totals.get('today_requests'))} req"
)
def print_once(payload: dict[str, Any], filter_text: str = "") -> None:
print(summary_line(payload))
print("name provider group daily today tokens req kind 5h 7d reset status")
for row in normalize_account_rows(payload, filter_text):
print(
f"{row['name'][:30]:<30} "
f"{row['provider']:<9} "
f"{row['routing_group']:<6} "
f"{row['daily_quota_cell']:<10} "
f"{format_cost(row['today_cost_usd']):<10} "
f"{format_count(row['today_tokens']):<8} "
f"{format_count(row['today_requests']):<5} "
f"{row['kind_label']:<9} "
f"{row['five_hour']:<9} "
f"{row['weekly']:<9} "
f"{row['reset']:<11} "
f"{row['status']}"
)
def run_textual(api_url: str, refresh_seconds: int, timeout: int) -> int:
try:
from textual.app import App, ComposeResult
from textual.widgets import DataTable, Footer, Header, Input, Static
except ImportError:
print("Textual is required. Run with: uv run --with textual python sub2api_quota_tui.py", file=sys.stderr)
return 2
class Sub2APIQuotaApp(App[None]):
CSS = """
#summary { height: 1; padding: 0 1; color: $accent; }
#filter { height: 3; }
#accounts { height: 1fr; }
#detail { height: 3; padding: 0 1; border-top: solid $panel; }
#status { height: 1; padding: 0 1; color: $text-muted; }
"""
BINDINGS = [
("q", "quit", "Quit"),
("r", "refresh", "Refresh"),
("/", "focus_filter", "Filter"),
]
def __init__(self) -> None:
super().__init__()
self.payload: dict[str, Any] = {}
self.rows: list[dict[str, Any]] = []
self.row_by_key: dict[str, dict[str, Any]] = {}
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
yield Static("", id="summary")
yield Input(placeholder="filter", id="filter")
yield DataTable(id="accounts")
yield Static("", id="detail")
yield Static("", id="status")
yield Footer()
def on_mount(self) -> None:
table = self.query_one("#accounts", DataTable)
table.cursor_type = "row"
table.zebra_stripes = True
table.add_columns("Name", "Provider", "Group", "Daily", "Today", "Tokens", "Req", "Kind", "5h", "7d", "Reset", "Status")
self.refresh_data(refresh=True)
self.set_interval(refresh_seconds, self.refresh_data)
def action_refresh(self) -> None:
self.refresh_data(refresh=True)
def action_focus_filter(self) -> None:
self.query_one("#filter", Input).focus()
def on_input_changed(self, event: Input.Changed) -> None:
if event.input.id == "filter":
self.render_payload()
def refresh_data(self, refresh: bool = False) -> None:
status = self.query_one("#status", Static)
status.update("refreshing...")
try:
self.payload = fetch_payload(api_url, timeout, refresh=refresh)
self.render_payload()
status.update(f"source {self.payload.get('source_name') or '-'} | {api_url}")
except Exception as exc:
status.update(f"error: {exc}")
def render_payload(self) -> None:
filter_text = self.query_one("#filter", Input).value
self.rows = normalize_account_rows(self.payload, filter_text)
table = self.query_one("#accounts", DataTable)
table.clear()
self.row_by_key = {}
for row in self.rows:
key = str(row["id"])
self.row_by_key[key] = row
table.add_row(
row["name"],
row["provider"],
row["routing_group"],
row["daily_quota_cell"],
format_cost(row["today_cost_usd"]),
format_count(row["today_tokens"]),
format_count(row["today_requests"]),
row["kind_label"],
row["five_hour"],
row["weekly"],
row["reset"],
row["status"],
key=key,
)
self.query_one("#summary", Static).update(summary_line(self.payload))
if self.rows:
self.render_detail(self.rows[0])
else:
self.query_one("#detail", Static).update("no accounts")
def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None:
key = str(event.row_key.value)
row = self.row_by_key.get(key)
if row:
self.render_detail(row)
def render_detail(self, row: dict[str, Any]) -> None:
raw = row.get("raw") if isinstance(row.get("raw"), dict) else {}
detail = (
f"{row['name']} | {row['provider']} | group {row['routing_group']} | {row['kind_label']} | {row['account_type']} | "
f"daily {row['daily_quota_cell']} ({format_percent(row['daily_quota_used_percent'])} used, "
f"{row['daily_quota_remaining'] if row['daily_quota_remaining'] is not None else '-'} left) | "
f"today {format_cost(row['today_cost_usd'])}, {format_count(row['today_tokens'])} tokens, "
f"{format_count(row['today_requests'])} req | latest {row['latest_usage_at']} | "
f"priority {row['priority'] if row['priority'] is not None else '-'}"
)
error = str(raw.get("error") or "").strip()
if error:
detail += f" | {error}"
self.query_one("#detail", Static).update(detail)
Sub2APIQuotaApp().run()
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Sub2API quota and daily usage TUI")
parser.add_argument("--api-url", default=default_api_url())
parser.add_argument(
"--refresh-seconds",
type=int,
default=env_int("SUB2API_QUOTA_TUI_REFRESH_SECONDS", DEFAULT_REFRESH_SECONDS),
)
parser.add_argument("--timeout", type=int, default=env_int("SUB2API_QUOTA_TUI_TIMEOUT", DEFAULT_TIMEOUT_SECONDS))
parser.add_argument("--once", action="store_true", help="print one snapshot and exit")
parser.add_argument("--filter", default="", help="initial filter for --once output")
return parser
def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
if args.once:
print_once(fetch_payload(args.api_url, args.timeout, refresh=True), args.filter)
return 0
return run_textual(args.api_url, max(1, args.refresh_seconds), max(1, args.timeout))
if __name__ == "__main__":
raise SystemExit(main())