#!/usr/bin/env python3 """Textual TUI for Sub2API quota and daily account usage.""" from __future__ import annotations import argparse import datetime as dt import json import os import sys import urllib.parse import urllib.request from pathlib import Path from typing import Any DEFAULT_API_URL = "http://127.0.0.1:18318/api/tui/accounts" DEFAULT_CONFIG_FILE = "~/.config/shusub2/api-url" DEFAULT_STATUS_CONFIG_FILE = "~/.config/shusub2/status-url" DEFAULT_REFRESH_SECONDS = 60 DEFAULT_TIMEOUT_SECONDS = 10 MONITOR_OK_STATUSES = {"operational", "ok", "success"} MONITOR_FAILED_STATUSES = {"error", "failed", "failure"} MONITOR_STOPWORDS = {"response", "responses", "monitor"} def env_int(name: str, default: int, *, minimum: int = 1) -> int: try: return max(minimum, int(os.environ.get(name, default))) except Exception: return default def configured_url(env_names: tuple[str, ...], config_path: str, default: str = "") -> str: for name in env_names: value = os.environ.get(name, "").strip() if value: return value config_file = Path(config_path).expanduser() try: for line in config_file.read_text(encoding="utf-8").splitlines(): value = line.strip() if value and not value.startswith("#"): return value except OSError: pass return default def default_api_url() -> str: return configured_url( ("SHUSUB2_API_URL", "SUB2API_QUOTA_TUI_API_URL"), os.environ.get("SHUSUB2_API_URL_FILE", DEFAULT_CONFIG_FILE), DEFAULT_API_URL, ) def default_status_url() -> str: return configured_url( ("SHUSUB2_STATUS_URL",), os.environ.get("SHUSUB2_STATUS_URL_FILE", DEFAULT_STATUS_CONFIG_FILE), ) def as_float(value: Any) -> float: try: if value is None or str(value).strip() == "": return 0.0 return float(value) except Exception: return 0.0 def as_int(value: Any) -> int: try: if value is None or str(value).strip() == "": return 0 return int(float(value)) except Exception: return 0 def format_cost(value: Any) -> str: amount = as_float(value) if amount == 0: return "$0" if abs(amount) < 0.01: return f"${amount:.6f}".rstrip("0").rstrip(".") if abs(amount) < 10: return f"${amount:.3f}".rstrip("0").rstrip(".") return f"${amount:.2f}".rstrip("0").rstrip(".") def format_count(value: Any) -> str: number = as_int(value) if abs(number) >= 1_000_000: return f"{number / 1_000_000:.1f}M".rstrip("0").rstrip(".") if abs(number) >= 1_000: return f"{number / 1_000:.1f}K".rstrip("0").rstrip(".") return str(number) def format_percent(value: Any) -> str: if value is None or str(value).strip() == "": return "-" percent = as_float(value) rounded = round(percent, 1) if rounded.is_integer(): return f"{int(rounded)}%" return f"{rounded}%" def parse_time(value: Any) -> dt.datetime | None: text = str(value or "").strip() if not text: return None try: normalized = text[:-1] + "+00:00" if text.endswith("Z") else text return dt.datetime.fromisoformat(normalized) except Exception: return None def short_time(value: Any) -> str: parsed = parse_time(value) if not parsed: return "-" return parsed.astimezone().strftime("%m-%d %H:%M") def add_refresh_param(api_url: str, refresh: bool) -> str: if not refresh: return api_url parsed = urllib.parse.urlparse(api_url) query = urllib.parse.parse_qsl(parsed.query, keep_blank_values=True) query = [(key, value) for key, value in query if key != "refresh"] query.append(("refresh", "1")) return urllib.parse.urlunparse(parsed._replace(query=urllib.parse.urlencode(query))) def fetch_payload(api_url: str, timeout: int, *, refresh: bool = False) -> dict[str, Any]: req = urllib.request.Request(add_refresh_param(api_url, refresh), headers={"Accept": "application/json"}) with urllib.request.urlopen(req, timeout=timeout) as response: data = json.loads(response.read().decode("utf-8")) if not isinstance(data, dict): raise RuntimeError("API did not return a JSON object") return data def fetch_optional_payload(url: str, timeout: int) -> tuple[dict[str, Any], str]: if not str(url or "").strip(): return {}, "" try: return fetch_payload(url, timeout), "" except Exception as exc: return {}, str(exc) def kind_label(kind: Any) -> str: return { "quota_limited": "quota", "pending_quota": "pending", "daily_limited": "daily", "pay_as_you_go": "usage", "disabled": "disabled", }.get(str(kind or ""), str(kind or "unknown")) def provider_label(account: dict[str, Any]) -> str: text = str(account.get("provider") or account.get("platform") or "").strip().lower() if not text: return "-" if "anthropic" in text or "claude" in text: return "anthropic" if "openai" in text or "chatgpt" in text: return "openai" return text def routing_group_sort_rank(value: Any) -> int: return {"sfast": 0, "fast": 1, "slow": 2, "id": 3, "": 4, "-": 4}.get(str(value or "").strip().lower(), 4) def status_kind(value: Any) -> str: text = str(value or "").strip().lower() if text in MONITOR_OK_STATUSES: return "ok" if text in MONITOR_FAILED_STATUSES: return "failed" return "unknown" def format_latency(value: Any) -> str: number = as_int(value) return "-" if number <= 0 else f"{number}ms" def monitor_items(status_payload: dict[str, Any]) -> list[dict[str, Any]]: monitors = status_payload.get("channel_monitors") if isinstance(status_payload.get("channel_monitors"), dict) else {} items = [] for item in monitors.get("items") or []: if isinstance(item, dict): items.append(item) return items def monitor_summary(status_payload: dict[str, Any], error: str = "") -> str: if error: return f"monitors error: {error}" monitors = status_payload.get("channel_monitors") if isinstance(status_payload.get("channel_monitors"), dict) else {} if not monitors: return "monitors -" enabled = as_int(monitors.get("enabled")) ok = as_int(monitors.get("latest_ok")) failed = as_int(monitors.get("latest_failed")) unknown = as_int(monitors.get("latest_unknown")) checked = short_time(monitors.get("latest_checked_max")) return f"monitors {ok} ok / {failed} failed / {unknown} unknown ({enabled} enabled, {checked})" def text_tokens(value: Any) -> set[str]: tokens = set() for token in "".join(ch.lower() if ch.isalnum() else " " for ch in str(value or "")).split(): if token and token not in MONITOR_STOPWORDS and not token.isdigit(): tokens.add(token) return tokens def matching_monitor(row: dict[str, Any], status_payload: dict[str, Any]) -> dict[str, Any] | None: account_tokens = text_tokens(row.get("name")) if not account_tokens: return None best: tuple[int, dict[str, Any]] | None = None for item in monitor_items(status_payload): monitor_tokens = text_tokens(item.get("name")) if not monitor_tokens: continue overlap = account_tokens & monitor_tokens if not overlap: continue subset_bonus = 10 if account_tokens <= monitor_tokens or monitor_tokens <= account_tokens else 0 score = subset_bonus + len(overlap) if best is None or score > best[0]: best = (score, item) return None if best is None else best[1] def monitor_detail(row: dict[str, Any], status_payload: dict[str, Any]) -> str: item = matching_monitor(row, status_payload) if not item: return "monitor -" status = str(item.get("latest_status") or "unknown") latency = format_latency(item.get("latency_ms")) checked = short_time(item.get("checked_at")) message = str(item.get("message") or "").strip() detail = f"monitor {status} {latency} checked {checked}" if message: detail += f" {message}" return detail def window_for(account: dict[str, Any], window_id: str) -> dict[str, Any]: for window in account.get("windows") or []: if isinstance(window, dict) and window.get("id") == window_id: return window return {} def window_cell(account: dict[str, Any], window_id: str) -> str: window = window_for(account, window_id) if not window or window.get("used_percent") is None: return "-" return f"{format_percent(window.get('used_percent'))}/{format_percent(window.get('remaining_percent'))}" def daily_quota_cell(account: dict[str, Any]) -> str: limit = account.get("daily_quota_limit") used = account.get("daily_quota_used") if limit is None and used is None: return "-" if limit is None: return str(used) return f"{used or 0}/{limit}" def reset_cell(account: dict[str, Any]) -> str: daily_reset = short_time(account.get("daily_quota_reset_at")) if daily_reset != "-": return daily_reset five_hour = window_for(account, "five-hour") weekly = window_for(account, "weekly") for window in (five_hour, weekly): reset = short_time(window.get("reset")) if reset != "-": return reset return "-" def normalize_account_rows(payload: dict[str, Any], filter_text: str = "") -> list[dict[str, Any]]: needle = filter_text.strip().lower() rows = [] for account in payload.get("accounts") or []: if not isinstance(account, dict): continue haystack = " ".join( str(account.get(key) or "") for key in ( "id", "name", "routing_group", "provider", "kind", "status", "account_type", "platform", "plan", "daily_quota_timezone", ) ).lower() if needle and needle not in haystack: continue routing_group = str(account.get("routing_group") or "-") rows.append( { "id": as_int(account.get("id")), "name": str(account.get("name") or ""), "routing_group": routing_group, "provider": provider_label(account), "kind": str(account.get("kind") or "unknown"), "kind_label": kind_label(account.get("kind")), "status": str(account.get("status") or ""), "account_type": str(account.get("account_type") or ""), "plan": str(account.get("plan") or ""), "priority": account.get("priority"), "today_cost_usd": as_float(account.get("today_cost_usd")), "today_actual_cost_usd": as_float(account.get("today_actual_cost_usd")), "today_tokens": as_int(account.get("today_tokens")), "today_requests": as_int(account.get("today_requests")), "daily_quota": account.get("daily_quota") if isinstance(account.get("daily_quota"), dict) else {}, "daily_quota_limit": account.get("daily_quota_limit"), "daily_quota_used": account.get("daily_quota_used"), "daily_quota_remaining": account.get("daily_quota_remaining"), "daily_quota_used_percent": account.get("daily_quota_used_percent"), "daily_quota_reset_at": account.get("daily_quota_reset_at"), "daily_quota_cell": daily_quota_cell(account), "quota_used_percent_max": account.get("quota_used_percent_max"), "five_hour": window_cell(account, "five-hour"), "weekly": window_cell(account, "weekly"), "reset": reset_cell(account), "latest_usage_at": short_time(account.get("latest_usage_at") or account.get("last_used_at")), "error": str(account.get("error") or ""), "raw": account, } ) rows.sort( key=lambda item: ( routing_group_sort_rank(item["routing_group"]), 1 if item["kind"] == "disabled" else 0, -as_float(item["today_cost_usd"]), -as_int(item["today_tokens"]), -as_float(item["daily_quota_used_percent"]), -as_float(item["quota_used_percent_max"]), str(item["name"]).lower(), ) ) return rows def summary_line(payload: dict[str, Any]) -> str: totals = payload.get("totals") if isinstance(payload.get("totals"), dict) else {} generated = short_time(payload.get("generated_local") or payload.get("generated_at")) cached = "cached" if payload.get("cached") else "live" return ( f"{generated} {cached} | " f"{as_int(totals.get('usable_accounts'))}/{as_int(totals.get('total_accounts'))} usable | " f"today {format_cost(totals.get('today_cost_usd'))} | " f"{format_count(totals.get('today_tokens'))} tokens | " f"{format_count(totals.get('today_requests'))} req" ) def print_once(payload: dict[str, Any], filter_text: str = "", status_payload: dict[str, Any] | None = None, status_error: str = "") -> None: print(summary_line(payload)) if status_payload or status_error: print(monitor_summary(status_payload or {}, status_error)) print("name provider group daily today tokens req kind 5h 7d reset status") for row in normalize_account_rows(payload, filter_text): print( f"{row['name'][:30]:<30} " f"{row['provider']:<9} " f"{row['routing_group']:<6} " f"{row['daily_quota_cell']:<10} " f"{format_cost(row['today_cost_usd']):<10} " f"{format_count(row['today_tokens']):<8} " f"{format_count(row['today_requests']):<5} " f"{row['kind_label']:<9} " f"{row['five_hour']:<9} " f"{row['weekly']:<9} " f"{row['reset']:<11} " f"{row['status']}" ) def run_textual(api_url: str, status_url: str, refresh_seconds: int, timeout: int) -> int: try: from textual.app import App, ComposeResult from textual.widgets import DataTable, Footer, Header, Input, Static except ImportError: print("Textual is required. Run with: uv run --with textual python sub2api_quota_tui.py", file=sys.stderr) return 2 class Sub2APIQuotaApp(App[None]): CSS = """ #summary { height: 1; padding: 0 1; color: $accent; } #filter { height: 3; } #accounts { height: 1fr; } #detail { height: 3; padding: 0 1; border-top: solid $panel; } #status { height: 1; padding: 0 1; color: $text-muted; } """ BINDINGS = [ ("q", "quit", "Quit"), ("r", "refresh", "Refresh"), ("/", "focus_filter", "Filter"), ] def __init__(self) -> None: super().__init__() self.payload: dict[str, Any] = {} self.status_payload: dict[str, Any] = {} self.status_error = "" self.rows: list[dict[str, Any]] = [] self.row_by_key: dict[str, dict[str, Any]] = {} def compose(self) -> ComposeResult: yield Header(show_clock=True) yield Static("", id="summary") yield Input(placeholder="filter", id="filter") yield DataTable(id="accounts") yield Static("", id="detail") yield Static("", id="status") yield Footer() def on_mount(self) -> None: table = self.query_one("#accounts", DataTable) table.cursor_type = "row" table.zebra_stripes = True table.add_columns("Name", "Provider", "Group", "Daily", "Today", "Tokens", "Req", "Kind", "5h", "7d", "Reset", "Status") self.refresh_data(refresh=True) self.set_interval(refresh_seconds, self.refresh_data) def action_refresh(self) -> None: self.refresh_data(refresh=True) def action_focus_filter(self) -> None: self.query_one("#filter", Input).focus() def on_input_changed(self, event: Input.Changed) -> None: if event.input.id == "filter": self.render_payload() def refresh_data(self, refresh: bool = False) -> None: status = self.query_one("#status", Static) status.update("refreshing...") try: self.payload = fetch_payload(api_url, timeout, refresh=refresh) self.status_payload, self.status_error = fetch_optional_payload(status_url, timeout) self.render_payload() status_bits = [monitor_summary(self.status_payload, self.status_error), f"source {self.payload.get('source_name') or '-'}"] status.update(" | ".join(bit for bit in status_bits if bit) + f" | {api_url}") except Exception as exc: status.update(f"error: {exc}") def render_payload(self) -> None: filter_text = self.query_one("#filter", Input).value self.rows = normalize_account_rows(self.payload, filter_text) table = self.query_one("#accounts", DataTable) table.clear() self.row_by_key = {} for row in self.rows: key = str(row["id"]) self.row_by_key[key] = row table.add_row( row["name"], row["provider"], row["routing_group"], row["daily_quota_cell"], format_cost(row["today_cost_usd"]), format_count(row["today_tokens"]), format_count(row["today_requests"]), row["kind_label"], row["five_hour"], row["weekly"], row["reset"], row["status"], key=key, ) self.query_one("#summary", Static).update(summary_line(self.payload)) if self.rows: self.render_detail(self.rows[0]) else: self.query_one("#detail", Static).update("no accounts") def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None: key = str(event.row_key.value) row = self.row_by_key.get(key) if row: self.render_detail(row) def render_detail(self, row: dict[str, Any]) -> None: raw = row.get("raw") if isinstance(row.get("raw"), dict) else {} detail = ( f"{row['name']} | {row['provider']} | group {row['routing_group']} | {row['kind_label']} | {row['account_type']} | " f"daily {row['daily_quota_cell']} ({format_percent(row['daily_quota_used_percent'])} used, " f"{row['daily_quota_remaining'] if row['daily_quota_remaining'] is not None else '-'} left) | " f"today {format_cost(row['today_cost_usd'])}, {format_count(row['today_tokens'])} tokens, " f"{format_count(row['today_requests'])} req | latest {row['latest_usage_at']} | " f"priority {row['priority'] if row['priority'] is not None else '-'} | " f"{monitor_detail(row, self.status_payload)}" ) error = str(raw.get("error") or "").strip() if error: detail += f" | {error}" self.query_one("#detail", Static).update(detail) Sub2APIQuotaApp().run() return 0 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Sub2API quota and daily usage TUI") parser.add_argument("--api-url", default=default_api_url()) parser.add_argument("--status-url", default=default_status_url(), help="optional sub2api-status /api/status URL for channel monitor health") parser.add_argument( "--refresh-seconds", type=int, default=env_int("SUB2API_QUOTA_TUI_REFRESH_SECONDS", DEFAULT_REFRESH_SECONDS), ) parser.add_argument("--timeout", type=int, default=env_int("SUB2API_QUOTA_TUI_TIMEOUT", DEFAULT_TIMEOUT_SECONDS)) parser.add_argument("--once", action="store_true", help="print one snapshot and exit") parser.add_argument("--filter", default="", help="initial filter for --once output") return parser def main(argv: list[str] | None = None) -> int: args = build_parser().parse_args(argv) if args.once: status_payload, status_error = fetch_optional_payload(args.status_url, args.timeout) print_once(fetch_payload(args.api_url, args.timeout, refresh=True), args.filter, status_payload, status_error) return 0 return run_textual(args.api_url, args.status_url, max(1, args.refresh_seconds), max(1, args.timeout)) if __name__ == "__main__": raise SystemExit(main())