#!/usr/bin/env python3 """Textual TUI for Sub2API quota and daily account usage.""" from __future__ import annotations import argparse import datetime as dt import importlib.metadata import json import os import re import sys import urllib.parse import urllib.request from pathlib import Path from typing import Any APP_NAME = "shusub2" FALLBACK_VERSION = "0.1.6" DEFAULT_API_URL = "http://127.0.0.1:18318/api/tui/accounts" DEFAULT_CONFIG_FILE = "~/.config/shusub2/api-url" DEFAULT_STATUS_CONFIG_FILE = "~/.config/shusub2/status-url" DEFAULT_VERSION_CHECK_URL = "https://gitea.shujk.top/shujakuin/shusub2/raw/branch/main/pyproject.toml" DEFAULT_REFRESH_SECONDS = 60 DEFAULT_TIMEOUT_SECONDS = 10 DEFAULT_VERSION_CHECK_TIMEOUT_SECONDS = 2 MONITOR_OK_STATUSES = {"operational", "ok", "success"} MONITOR_FAILED_STATUSES = {"error", "failed", "failure"} MONITOR_STOPWORDS = {"response", "responses", "monitor"} INSTALL_COMMAND = "uv tool install --force git+https://gitea.shujk.top/shujakuin/shusub2.git" def env_int(name: str, default: int, *, minimum: int = 1) -> int: try: return max(minimum, int(os.environ.get(name, default))) except Exception: return default def configured_url(env_names: tuple[str, ...], config_path: str, default: str = "") -> str: for name in env_names: value = os.environ.get(name, "").strip() if value: return value config_file = Path(config_path).expanduser() try: for line in config_file.read_text(encoding="utf-8").splitlines(): value = line.strip() if value and not value.startswith("#"): return value except OSError: pass return default def default_api_url() -> str: return configured_url( ("SHUSUB2_API_URL", "SUB2API_QUOTA_TUI_API_URL"), os.environ.get("SHUSUB2_API_URL_FILE", DEFAULT_CONFIG_FILE), DEFAULT_API_URL, ) def default_status_url() -> str: return configured_url( ("SHUSUB2_STATUS_URL",), os.environ.get("SHUSUB2_STATUS_URL_FILE", DEFAULT_STATUS_CONFIG_FILE), ) def inferred_status_url(api_url: str) -> str: parsed = urllib.parse.urlparse(str(api_url or "").strip()) if not parsed.scheme or not parsed.netloc: return "" path = parsed.path.rstrip("/") if path.endswith("/api/tui/accounts"): status_path = path[: -len("/api/tui/accounts")] + "/api/status" elif path.endswith("/api/accounts"): status_path = path[: -len("/api/accounts")] + "/api/status" else: return "" return urllib.parse.urlunparse(parsed._replace(path=status_path, params="", query="", fragment="")) def current_version() -> str: try: return importlib.metadata.version(APP_NAME) except importlib.metadata.PackageNotFoundError: return FALLBACK_VERSION except Exception: return FALLBACK_VERSION def parse_version(value: Any) -> tuple[int, ...]: parts = [] for part in re.split(r"[^0-9]+", str(value or "")): if part: parts.append(int(part)) return tuple(parts) def version_is_newer(latest: str, current: str) -> bool: latest_parts = parse_version(latest) current_parts = parse_version(current) width = max(len(latest_parts), len(current_parts), 1) return latest_parts + (0,) * (width - len(latest_parts)) > current_parts + (0,) * (width - len(current_parts)) def latest_version_from_text(text: str) -> str: match = re.search(r'(?m)^version\s*=\s*"([^"]+)"', text) return "" if not match else match.group(1).strip() def fetch_latest_version(url: str, timeout: int) -> str: req = urllib.request.Request(url, headers={"Accept": "text/plain"}) with urllib.request.urlopen(req, timeout=timeout) as response: return latest_version_from_text(response.read(65536).decode("utf-8", errors="replace")) def version_update_message(latest: str, current: str | None = None) -> str: current = current or current_version() if not latest or not version_is_newer(latest, current): return "" return f"update available: {APP_NAME} {current} -> {latest}; run `{INSTALL_COMMAND}`" def check_version_update(url: str, timeout: int, *, disabled: bool = False) -> str: if disabled or os.environ.get("SHUSUB2_NO_VERSION_CHECK", "").strip().lower() in {"1", "true", "yes", "on"}: return "" try: return version_update_message(fetch_latest_version(url, timeout)) except Exception: return "" def as_float(value: Any) -> float: try: if value is None or str(value).strip() == "": return 0.0 return float(value) except Exception: return 0.0 def as_int(value: Any) -> int: try: if value is None or str(value).strip() == "": return 0 return int(float(value)) except Exception: return 0 def format_cost(value: Any) -> str: amount = as_float(value) if amount == 0: return "$0" if abs(amount) < 0.01: return f"${amount:.6f}".rstrip("0").rstrip(".") if abs(amount) < 10: return f"${amount:.3f}".rstrip("0").rstrip(".") return f"${amount:.2f}".rstrip("0").rstrip(".") def format_count(value: Any) -> str: number = as_int(value) if abs(number) >= 1_000_000: return f"{number / 1_000_000:.1f}M".rstrip("0").rstrip(".") if abs(number) >= 1_000: return f"{number / 1_000:.1f}K".rstrip("0").rstrip(".") return str(number) def format_percent(value: Any) -> str: if value is None or str(value).strip() == "": return "-" percent = as_float(value) rounded = round(percent, 1) if rounded.is_integer(): return f"{int(rounded)}%" return f"{rounded}%" def parse_time(value: Any) -> dt.datetime | None: text = str(value or "").strip() if not text: return None try: normalized = text[:-1] + "+00:00" if text.endswith("Z") else text return dt.datetime.fromisoformat(normalized) except Exception: return None def short_time(value: Any) -> str: parsed = parse_time(value) if not parsed: return "-" return parsed.astimezone().strftime("%m-%d %H:%M") def add_refresh_param(api_url: str, refresh: bool) -> str: if not refresh: return api_url parsed = urllib.parse.urlparse(api_url) query = urllib.parse.parse_qsl(parsed.query, keep_blank_values=True) query = [(key, value) for key, value in query if key != "refresh"] query.append(("refresh", "1")) return urllib.parse.urlunparse(parsed._replace(query=urllib.parse.urlencode(query))) def fetch_payload(api_url: str, timeout: int, *, refresh: bool = False) -> dict[str, Any]: req = urllib.request.Request(add_refresh_param(api_url, refresh), headers={"Accept": "application/json"}) with urllib.request.urlopen(req, timeout=timeout) as response: data = json.loads(response.read().decode("utf-8")) if not isinstance(data, dict): raise RuntimeError("API did not return a JSON object") return data def fetch_optional_payload(url: str, timeout: int) -> tuple[dict[str, Any], str]: if not str(url or "").strip(): return {}, "" try: return fetch_payload(url, timeout), "" except Exception as exc: return {}, str(exc) def kind_label(kind: Any) -> str: return { "quota_limited": "quota", "pending_quota": "pending", "daily_limited": "daily", "pay_as_you_go": "usage", "disabled": "disabled", }.get(str(kind or ""), str(kind or "unknown")) def provider_label(account: dict[str, Any]) -> str: text = str(account.get("provider") or account.get("platform") or "").strip().lower() if not text: return "-" if "anthropic" in text or "claude" in text: return "anthropic" if "openai" in text or "chatgpt" in text: return "openai" return text def routing_group_sort_rank(value: Any) -> int: return {"sfast": 0, "fast": 1, "slow": 2, "id": 3, "": 4, "-": 4}.get(str(value or "").strip().lower(), 4) def status_kind(value: Any) -> str: text = str(value or "").strip().lower() if text in MONITOR_OK_STATUSES: return "ok" if text in MONITOR_FAILED_STATUSES: return "failed" return "unknown" def format_latency(value: Any) -> str: number = as_int(value) return "-" if number <= 0 else f"{number}ms" def monitor_items(status_payload: dict[str, Any]) -> list[dict[str, Any]]: monitors = status_payload.get("channel_monitors") if isinstance(status_payload.get("channel_monitors"), dict) else {} items = [] for item in monitors.get("items") or []: if isinstance(item, dict): items.append(item) return items def monitor_summary(status_payload: dict[str, Any], error: str = "") -> str: if error: return f"monitors error: {error}" monitors = status_payload.get("channel_monitors") if isinstance(status_payload.get("channel_monitors"), dict) else {} if not monitors: return "monitors -" enabled = as_int(monitors.get("enabled")) ok = as_int(monitors.get("latest_ok")) failed = as_int(monitors.get("latest_failed")) unknown = as_int(monitors.get("latest_unknown")) checked = short_time(monitors.get("latest_checked_max")) return f"monitors {ok} ok / {failed} failed / {unknown} unknown ({enabled} enabled, {checked})" def text_tokens(value: Any) -> set[str]: tokens = set() for token in "".join(ch.lower() if ch.isalnum() else " " for ch in str(value or "")).split(): if token and token not in MONITOR_STOPWORDS and not token.isdigit(): tokens.add(token) return tokens def matching_monitor(row: dict[str, Any], status_payload: dict[str, Any]) -> dict[str, Any] | None: base_url_hash = str(row.get("base_url_hash") or "").strip() if base_url_hash: for item in monitor_items(status_payload): if str(item.get("base_url_hash") or "").strip() == base_url_hash: return item account_tokens = text_tokens(row.get("name")) if not account_tokens: return None best: tuple[int, dict[str, Any]] | None = None for item in monitor_items(status_payload): monitor_tokens = text_tokens(item.get("name")) if not monitor_tokens: continue overlap = account_tokens & monitor_tokens if not overlap: continue subset_bonus = 10 if account_tokens <= monitor_tokens or monitor_tokens <= account_tokens else 0 score = subset_bonus + len(overlap) if best is None or score > best[0]: best = (score, item) return None if best is None else best[1] def monitor_detail(row: dict[str, Any], status_payload: dict[str, Any]) -> str: item = matching_monitor(row, status_payload) if not item: return "monitor -" status = str(item.get("latest_status") or "unknown") latency = format_latency(item.get("latency_ms")) checked = short_time(item.get("checked_at")) message = str(item.get("message") or "").strip() detail = f"monitor {status} {latency} checked {checked}" if message: detail += f" {message}" return detail def monitor_availability(row: dict[str, Any], status_payload: dict[str, Any]) -> str: item = matching_monitor(row, status_payload) if not item: return "-" if item.get("enabled") is False: return "disabled" raw_status = str(item.get("latest_status") or "").strip().lower() if not raw_status: return "unknown" kind = status_kind(raw_status) if kind == "ok": return "ok" if kind == "failed": return "failed" return raw_status def window_for(account: dict[str, Any], window_id: str) -> dict[str, Any]: for window in account.get("windows") or []: if isinstance(window, dict) and window.get("id") == window_id: return window return {} def window_cell(account: dict[str, Any], window_id: str) -> str: window = window_for(account, window_id) if not window or window.get("used_percent") is None: return "-" return f"{format_percent(window.get('used_percent'))}/{format_percent(window.get('remaining_percent'))}" def daily_quota_cell(account: dict[str, Any]) -> str: limit = account.get("daily_quota_limit") used = account.get("daily_quota_used") if limit is None: return "-" return f"{used or 0}/{limit}" def reset_cell(account: dict[str, Any]) -> str: daily_reset = short_time(account.get("daily_quota_reset_at")) if daily_reset != "-": return daily_reset five_hour = window_for(account, "five-hour") weekly = window_for(account, "weekly") for window in (five_hour, weekly): reset = short_time(window.get("reset")) if reset != "-": return reset return "-" def normalize_account_rows(payload: dict[str, Any], filter_text: str = "") -> list[dict[str, Any]]: needle = filter_text.strip().lower() rows = [] for account in payload.get("accounts") or []: if not isinstance(account, dict): continue haystack = " ".join( str(account.get(key) or "") for key in ( "id", "name", "routing_group", "provider", "kind", "status", "account_type", "platform", "plan", "daily_quota_timezone", ) ).lower() if needle and needle not in haystack: continue routing_group = str(account.get("routing_group") or "-") rows.append( { "id": as_int(account.get("id")), "name": str(account.get("name") or ""), "routing_group": routing_group, "base_url_hash": str(account.get("base_url_hash") or ""), "provider": provider_label(account), "kind": str(account.get("kind") or "unknown"), "kind_label": kind_label(account.get("kind")), "status": str(account.get("status") or ""), "account_type": str(account.get("account_type") or ""), "plan": str(account.get("plan") or ""), "priority": account.get("priority"), "today_cost_usd": as_float(account.get("today_cost_usd")), "today_actual_cost_usd": as_float(account.get("today_actual_cost_usd")), "today_tokens": as_int(account.get("today_tokens")), "today_requests": as_int(account.get("today_requests")), "daily_quota": account.get("daily_quota") if isinstance(account.get("daily_quota"), dict) else {}, "daily_quota_limit": account.get("daily_quota_limit"), "daily_quota_used": account.get("daily_quota_used"), "daily_quota_remaining": account.get("daily_quota_remaining"), "daily_quota_used_percent": account.get("daily_quota_used_percent"), "daily_quota_reset_at": account.get("daily_quota_reset_at"), "daily_quota_cell": daily_quota_cell(account), "quota_used_percent_max": account.get("quota_used_percent_max"), "five_hour": window_cell(account, "five-hour"), "weekly": window_cell(account, "weekly"), "reset": reset_cell(account), "latest_usage_at": short_time(account.get("latest_usage_at") or account.get("last_used_at")), "error": str(account.get("error") or ""), "raw": account, } ) rows.sort( key=lambda item: ( routing_group_sort_rank(item["routing_group"]), 1 if item["kind"] == "disabled" else 0, -as_float(item["today_cost_usd"]), -as_int(item["today_tokens"]), -as_float(item["daily_quota_used_percent"]), -as_float(item["quota_used_percent_max"]), str(item["name"]).lower(), ) ) return rows def summary_line(payload: dict[str, Any]) -> str: totals = payload.get("totals") if isinstance(payload.get("totals"), dict) else {} generated = short_time(payload.get("generated_local") or payload.get("generated_at")) cached = "cached" if payload.get("cached") else "live" return ( f"{generated} {cached} | " f"{as_int(totals.get('usable_accounts'))}/{as_int(totals.get('total_accounts'))} usable | " f"today {format_cost(totals.get('today_cost_usd'))} | " f"{format_count(totals.get('today_tokens'))} tokens | " f"{format_count(totals.get('today_requests'))} req" ) def print_once(payload: dict[str, Any], filter_text: str = "", status_payload: dict[str, Any] | None = None, status_error: str = "") -> None: print(summary_line(payload)) if status_payload or status_error: print(monitor_summary(status_payload or {}, status_error)) print("name provider group daily today tokens req kind 5h 7d reset status availability") for row in normalize_account_rows(payload, filter_text): print( f"{row['name'][:30]:<30} " f"{row['provider']:<9} " f"{row['routing_group']:<6} " f"{row['daily_quota_cell']:<10} " f"{format_cost(row['today_cost_usd']):<10} " f"{format_count(row['today_tokens']):<8} " f"{format_count(row['today_requests']):<5} " f"{row['kind_label']:<9} " f"{row['five_hour']:<9} " f"{row['weekly']:<9} " f"{row['reset']:<11} " f"{row['status']:<10} " f"{monitor_availability(row, status_payload or {})}" ) def run_textual(api_url: str, status_url: str, refresh_seconds: int, timeout: int, version_message: str = "") -> int: try: from textual.app import App, ComposeResult from textual.widgets import DataTable, Footer, Header, Input, Static except ImportError: print("Textual is required. Run with: uv run --with textual python sub2api_quota_tui.py", file=sys.stderr) return 2 class Sub2APIQuotaApp(App[None]): CSS = """ #summary { height: 1; padding: 0 1; color: $accent; } #filter { height: 3; } #accounts { height: 1fr; } #detail { height: 3; padding: 0 1; border-top: solid $panel; } #status { height: 1; padding: 0 1; color: $text-muted; } """ BINDINGS = [ ("q", "quit", "Quit"), ("r", "refresh", "Refresh"), ("/", "focus_filter", "Filter"), ] def __init__(self) -> None: super().__init__() self.payload: dict[str, Any] = {} self.status_payload: dict[str, Any] = {} self.status_error = "" self.rows: list[dict[str, Any]] = [] self.row_by_key: dict[str, dict[str, Any]] = {} def compose(self) -> ComposeResult: yield Header(show_clock=True) yield Static("", id="summary") yield Input(placeholder="filter", id="filter") yield DataTable(id="accounts") yield Static("", id="detail") yield Static("", id="status") yield Footer() def on_mount(self) -> None: table = self.query_one("#accounts", DataTable) table.cursor_type = "row" table.zebra_stripes = True table.add_columns("Name", "Provider", "Group", "Daily", "Today", "Tokens", "Req", "Kind", "5h", "7d", "Reset", "Status", "Availability") self.refresh_data(refresh=True) self.set_interval(refresh_seconds, self.refresh_data) def action_refresh(self) -> None: self.refresh_data(refresh=True) def action_focus_filter(self) -> None: self.query_one("#filter", Input).focus() def on_input_changed(self, event: Input.Changed) -> None: if event.input.id == "filter": self.render_payload() def refresh_data(self, refresh: bool = False) -> None: status = self.query_one("#status", Static) status.update("refreshing...") try: self.payload = fetch_payload(api_url, timeout, refresh=refresh) self.status_payload, self.status_error = fetch_optional_payload(status_url, timeout) self.render_payload() status_bits = [version_message, monitor_summary(self.status_payload, self.status_error), f"source {self.payload.get('source_name') or '-'}"] status.update(" | ".join(bit for bit in status_bits if bit) + f" | {api_url}") except Exception as exc: status.update(f"error: {exc}") def render_payload(self) -> None: filter_text = self.query_one("#filter", Input).value self.rows = normalize_account_rows(self.payload, filter_text) table = self.query_one("#accounts", DataTable) table.clear() self.row_by_key = {} for row in self.rows: key = str(row["id"]) self.row_by_key[key] = row table.add_row( row["name"], row["provider"], row["routing_group"], row["daily_quota_cell"], format_cost(row["today_cost_usd"]), format_count(row["today_tokens"]), format_count(row["today_requests"]), row["kind_label"], row["five_hour"], row["weekly"], row["reset"], row["status"], monitor_availability(row, self.status_payload), key=key, ) self.query_one("#summary", Static).update(summary_line(self.payload)) if self.rows: self.render_detail(self.rows[0]) else: self.query_one("#detail", Static).update("no accounts") def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None: key = str(event.row_key.value) row = self.row_by_key.get(key) if row: self.render_detail(row) def render_detail(self, row: dict[str, Any]) -> None: raw = row.get("raw") if isinstance(row.get("raw"), dict) else {} detail = ( f"{row['name']} | {row['provider']} | group {row['routing_group']} | {row['kind_label']} | {row['account_type']} | " f"daily {row['daily_quota_cell']} ({format_percent(row['daily_quota_used_percent'])} used, " f"{row['daily_quota_remaining'] if row['daily_quota_remaining'] is not None else '-'} left) | " f"today {format_cost(row['today_cost_usd'])}, {format_count(row['today_tokens'])} tokens, " f"{format_count(row['today_requests'])} req | latest {row['latest_usage_at']} | " f"priority {row['priority'] if row['priority'] is not None else '-'} | " f"{monitor_detail(row, self.status_payload)}" ) error = str(raw.get("error") or "").strip() if error: detail += f" | {error}" self.query_one("#detail", Static).update(detail) Sub2APIQuotaApp().run() return 0 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Sub2API quota and daily usage TUI") parser.add_argument("--api-url", default=default_api_url()) parser.add_argument("--status-url", default=default_status_url(), help="optional sub2api-status /api/status URL for channel monitor health") parser.add_argument("--version-check-url", default=os.environ.get("SHUSUB2_VERSION_CHECK_URL", DEFAULT_VERSION_CHECK_URL)) parser.add_argument( "--version-check-timeout", type=int, default=env_int("SHUSUB2_VERSION_CHECK_TIMEOUT", DEFAULT_VERSION_CHECK_TIMEOUT_SECONDS), ) parser.add_argument("--no-version-check", action="store_true", default=os.environ.get("SHUSUB2_NO_VERSION_CHECK", "").strip().lower() in {"1", "true", "yes", "on"}) parser.add_argument( "--refresh-seconds", type=int, default=env_int("SUB2API_QUOTA_TUI_REFRESH_SECONDS", DEFAULT_REFRESH_SECONDS), ) parser.add_argument("--timeout", type=int, default=env_int("SUB2API_QUOTA_TUI_TIMEOUT", DEFAULT_TIMEOUT_SECONDS)) parser.add_argument("--once", action="store_true", help="print one snapshot and exit") parser.add_argument("--filter", default="", help="initial filter for --once output") return parser def main(argv: list[str] | None = None) -> int: args = build_parser().parse_args(argv) status_url = str(args.status_url or "").strip() or inferred_status_url(args.api_url) version_message = check_version_update( args.version_check_url, max(1, args.version_check_timeout), disabled=bool(args.no_version_check), ) if args.once: if version_message: print(version_message) status_payload, status_error = fetch_optional_payload(status_url, args.timeout) print_once(fetch_payload(args.api_url, args.timeout, refresh=True), args.filter, status_payload, status_error) return 0 return run_textual(args.api_url, status_url, max(1, args.refresh_seconds), max(1, args.timeout), version_message) if __name__ == "__main__": raise SystemExit(main())