704 lines
27 KiB
Python
704 lines
27 KiB
Python
#!/usr/bin/env python3
|
|
"""Textual TUI for Sub2API quota and daily account usage."""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import importlib.metadata
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import urllib.parse
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
APP_NAME = "shusub2"
|
|
FALLBACK_VERSION = "0.1.7"
|
|
DEFAULT_API_URL = "http://127.0.0.1:18318/api/tui/accounts"
|
|
DEFAULT_CONFIG_FILE = "~/.config/shusub2/api-url"
|
|
DEFAULT_STATUS_CONFIG_FILE = "~/.config/shusub2/status-url"
|
|
DEFAULT_VERSION_CHECK_URL = "https://gitea.shujk.top/shujakuin/shusub2/raw/branch/main/pyproject.toml"
|
|
DEFAULT_REFRESH_SECONDS = 60
|
|
DEFAULT_TIMEOUT_SECONDS = 10
|
|
DEFAULT_VERSION_CHECK_TIMEOUT_SECONDS = 2
|
|
MONITOR_OK_STATUSES = {"operational", "ok", "success"}
|
|
MONITOR_FAILED_STATUSES = {"error", "failed", "failure"}
|
|
MONITOR_STOPWORDS = {"response", "responses", "monitor"}
|
|
INSTALL_COMMAND = "uv tool install --force git+https://gitea.shujk.top/shujakuin/shusub2.git"
|
|
INSTALL_COMMAND_ARGS = ["uv", "tool", "install", "--force", "git+https://gitea.shujk.top/shujakuin/shusub2.git"]
|
|
|
|
|
|
def env_int(name: str, default: int, *, minimum: int = 1) -> int:
|
|
try:
|
|
return max(minimum, int(os.environ.get(name, default)))
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def configured_url(env_names: tuple[str, ...], config_path: str, default: str = "") -> str:
|
|
for name in env_names:
|
|
value = os.environ.get(name, "").strip()
|
|
if value:
|
|
return value
|
|
config_file = Path(config_path).expanduser()
|
|
try:
|
|
for line in config_file.read_text(encoding="utf-8").splitlines():
|
|
value = line.strip()
|
|
if value and not value.startswith("#"):
|
|
return value
|
|
except OSError:
|
|
pass
|
|
return default
|
|
|
|
|
|
def default_api_url() -> str:
|
|
return configured_url(
|
|
("SHUSUB2_API_URL", "SUB2API_QUOTA_TUI_API_URL"),
|
|
os.environ.get("SHUSUB2_API_URL_FILE", DEFAULT_CONFIG_FILE),
|
|
DEFAULT_API_URL,
|
|
)
|
|
|
|
|
|
def default_status_url() -> str:
|
|
return configured_url(
|
|
("SHUSUB2_STATUS_URL",),
|
|
os.environ.get("SHUSUB2_STATUS_URL_FILE", DEFAULT_STATUS_CONFIG_FILE),
|
|
)
|
|
|
|
|
|
def config_file_path() -> Path:
|
|
return Path(os.environ.get("SHUSUB2_API_URL_FILE", DEFAULT_CONFIG_FILE)).expanduser()
|
|
|
|
|
|
def write_api_url_config(api_url: str) -> Path:
|
|
value = str(api_url or "").strip()
|
|
if not value:
|
|
raise ValueError("api url is empty")
|
|
path = config_file_path()
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
path.parent.chmod(0o700)
|
|
except OSError:
|
|
pass
|
|
path.write_text(value + "\n", encoding="utf-8")
|
|
try:
|
|
path.chmod(0o600)
|
|
except OSError:
|
|
pass
|
|
return path
|
|
|
|
|
|
def run_install_command() -> int:
|
|
try:
|
|
return subprocess.run(INSTALL_COMMAND_ARGS, check=False).returncode
|
|
except FileNotFoundError:
|
|
print("uv command not found; install uv first, then run:", INSTALL_COMMAND, file=sys.stderr)
|
|
return 127
|
|
|
|
|
|
def inferred_status_url(api_url: str) -> str:
|
|
parsed = urllib.parse.urlparse(str(api_url or "").strip())
|
|
if not parsed.scheme or not parsed.netloc:
|
|
return ""
|
|
path = parsed.path.rstrip("/")
|
|
if path.endswith("/api/tui/accounts"):
|
|
status_path = path[: -len("/api/tui/accounts")] + "/api/status"
|
|
elif path.endswith("/api/accounts"):
|
|
status_path = path[: -len("/api/accounts")] + "/api/status"
|
|
else:
|
|
return ""
|
|
return urllib.parse.urlunparse(parsed._replace(path=status_path, params="", query="", fragment=""))
|
|
|
|
|
|
def current_version() -> str:
|
|
try:
|
|
return importlib.metadata.version(APP_NAME)
|
|
except importlib.metadata.PackageNotFoundError:
|
|
return FALLBACK_VERSION
|
|
except Exception:
|
|
return FALLBACK_VERSION
|
|
|
|
|
|
def parse_version(value: Any) -> tuple[int, ...]:
|
|
parts = []
|
|
for part in re.split(r"[^0-9]+", str(value or "")):
|
|
if part:
|
|
parts.append(int(part))
|
|
return tuple(parts)
|
|
|
|
|
|
def version_is_newer(latest: str, current: str) -> bool:
|
|
latest_parts = parse_version(latest)
|
|
current_parts = parse_version(current)
|
|
width = max(len(latest_parts), len(current_parts), 1)
|
|
return latest_parts + (0,) * (width - len(latest_parts)) > current_parts + (0,) * (width - len(current_parts))
|
|
|
|
|
|
def latest_version_from_text(text: str) -> str:
|
|
match = re.search(r'(?m)^version\s*=\s*"([^"]+)"', text)
|
|
return "" if not match else match.group(1).strip()
|
|
|
|
|
|
def fetch_latest_version(url: str, timeout: int) -> str:
|
|
req = urllib.request.Request(url, headers={"Accept": "text/plain"})
|
|
with urllib.request.urlopen(req, timeout=timeout) as response:
|
|
return latest_version_from_text(response.read(65536).decode("utf-8", errors="replace"))
|
|
|
|
|
|
def version_update_message(latest: str, current: str | None = None) -> str:
|
|
current = current or current_version()
|
|
if not latest or not version_is_newer(latest, current):
|
|
return ""
|
|
return f"update available: {APP_NAME} {current} -> {latest}; run `{INSTALL_COMMAND}`"
|
|
|
|
|
|
def check_version_update(url: str, timeout: int, *, disabled: bool = False) -> str:
|
|
if disabled or os.environ.get("SHUSUB2_NO_VERSION_CHECK", "").strip().lower() in {"1", "true", "yes", "on"}:
|
|
return ""
|
|
try:
|
|
return version_update_message(fetch_latest_version(url, timeout))
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def as_float(value: Any) -> float:
|
|
try:
|
|
if value is None or str(value).strip() == "":
|
|
return 0.0
|
|
return float(value)
|
|
except Exception:
|
|
return 0.0
|
|
|
|
|
|
def as_int(value: Any) -> int:
|
|
try:
|
|
if value is None or str(value).strip() == "":
|
|
return 0
|
|
return int(float(value))
|
|
except Exception:
|
|
return 0
|
|
|
|
|
|
def format_cost(value: Any) -> str:
|
|
amount = as_float(value)
|
|
if amount == 0:
|
|
return "$0"
|
|
if abs(amount) < 0.01:
|
|
return f"${amount:.6f}".rstrip("0").rstrip(".")
|
|
if abs(amount) < 10:
|
|
return f"${amount:.3f}".rstrip("0").rstrip(".")
|
|
return f"${amount:.2f}".rstrip("0").rstrip(".")
|
|
|
|
|
|
def format_count(value: Any) -> str:
|
|
number = as_int(value)
|
|
if abs(number) >= 1_000_000:
|
|
return f"{number / 1_000_000:.1f}M".rstrip("0").rstrip(".")
|
|
if abs(number) >= 1_000:
|
|
return f"{number / 1_000:.1f}K".rstrip("0").rstrip(".")
|
|
return str(number)
|
|
|
|
|
|
def format_percent(value: Any) -> str:
|
|
if value is None or str(value).strip() == "":
|
|
return "-"
|
|
percent = as_float(value)
|
|
rounded = round(percent, 1)
|
|
if rounded.is_integer():
|
|
return f"{int(rounded)}%"
|
|
return f"{rounded}%"
|
|
|
|
|
|
def parse_time(value: Any) -> dt.datetime | None:
|
|
text = str(value or "").strip()
|
|
if not text:
|
|
return None
|
|
try:
|
|
normalized = text[:-1] + "+00:00" if text.endswith("Z") else text
|
|
return dt.datetime.fromisoformat(normalized)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def short_time(value: Any) -> str:
|
|
parsed = parse_time(value)
|
|
if not parsed:
|
|
return "-"
|
|
return parsed.astimezone().strftime("%m-%d %H:%M")
|
|
|
|
|
|
def add_refresh_param(api_url: str, refresh: bool) -> str:
|
|
if not refresh:
|
|
return api_url
|
|
parsed = urllib.parse.urlparse(api_url)
|
|
query = urllib.parse.parse_qsl(parsed.query, keep_blank_values=True)
|
|
query = [(key, value) for key, value in query if key != "refresh"]
|
|
query.append(("refresh", "1"))
|
|
return urllib.parse.urlunparse(parsed._replace(query=urllib.parse.urlencode(query)))
|
|
|
|
|
|
def fetch_payload(api_url: str, timeout: int, *, refresh: bool = False) -> dict[str, Any]:
|
|
req = urllib.request.Request(add_refresh_param(api_url, refresh), headers={"Accept": "application/json"})
|
|
with urllib.request.urlopen(req, timeout=timeout) as response:
|
|
data = json.loads(response.read().decode("utf-8"))
|
|
if not isinstance(data, dict):
|
|
raise RuntimeError("API did not return a JSON object")
|
|
return data
|
|
|
|
|
|
def fetch_optional_payload(url: str, timeout: int) -> tuple[dict[str, Any], str]:
|
|
if not str(url or "").strip():
|
|
return {}, ""
|
|
try:
|
|
return fetch_payload(url, timeout), ""
|
|
except Exception as exc:
|
|
return {}, str(exc)
|
|
|
|
|
|
def kind_label(kind: Any) -> str:
|
|
return {
|
|
"quota_limited": "quota",
|
|
"pending_quota": "pending",
|
|
"daily_limited": "daily",
|
|
"pay_as_you_go": "usage",
|
|
"disabled": "disabled",
|
|
}.get(str(kind or ""), str(kind or "unknown"))
|
|
|
|
|
|
def provider_label(account: dict[str, Any]) -> str:
|
|
text = str(account.get("provider") or account.get("platform") or "").strip().lower()
|
|
if not text:
|
|
return "-"
|
|
if "anthropic" in text or "claude" in text:
|
|
return "anthropic"
|
|
if "openai" in text or "chatgpt" in text:
|
|
return "openai"
|
|
return text
|
|
|
|
|
|
def routing_group_sort_rank(value: Any) -> int:
|
|
return {"sfast": 0, "fast": 1, "slow": 2, "id": 3, "": 4, "-": 4}.get(str(value or "").strip().lower(), 4)
|
|
|
|
|
|
def status_kind(value: Any) -> str:
|
|
text = str(value or "").strip().lower()
|
|
if text in MONITOR_OK_STATUSES:
|
|
return "ok"
|
|
if text in MONITOR_FAILED_STATUSES:
|
|
return "failed"
|
|
return "unknown"
|
|
|
|
|
|
def format_latency(value: Any) -> str:
|
|
number = as_int(value)
|
|
return "-" if number <= 0 else f"{number}ms"
|
|
|
|
|
|
def monitor_items(status_payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
monitors = status_payload.get("channel_monitors") if isinstance(status_payload.get("channel_monitors"), dict) else {}
|
|
items = []
|
|
for item in monitors.get("items") or []:
|
|
if isinstance(item, dict):
|
|
items.append(item)
|
|
return items
|
|
|
|
|
|
def monitor_summary(status_payload: dict[str, Any], error: str = "") -> str:
|
|
if error:
|
|
return f"monitors error: {error}"
|
|
monitors = status_payload.get("channel_monitors") if isinstance(status_payload.get("channel_monitors"), dict) else {}
|
|
if not monitors:
|
|
return "monitors -"
|
|
enabled = as_int(monitors.get("enabled"))
|
|
ok = as_int(monitors.get("latest_ok"))
|
|
failed = as_int(monitors.get("latest_failed"))
|
|
unknown = as_int(monitors.get("latest_unknown"))
|
|
checked = short_time(monitors.get("latest_checked_max"))
|
|
return f"monitors {ok} ok / {failed} failed / {unknown} unknown ({enabled} enabled, {checked})"
|
|
|
|
|
|
def text_tokens(value: Any) -> set[str]:
|
|
tokens = set()
|
|
for token in "".join(ch.lower() if ch.isalnum() else " " for ch in str(value or "")).split():
|
|
if token and token not in MONITOR_STOPWORDS and not token.isdigit():
|
|
tokens.add(token)
|
|
return tokens
|
|
|
|
|
|
def matching_monitor(row: dict[str, Any], status_payload: dict[str, Any]) -> dict[str, Any] | None:
|
|
base_url_hash = str(row.get("base_url_hash") or "").strip()
|
|
if base_url_hash:
|
|
for item in monitor_items(status_payload):
|
|
if str(item.get("base_url_hash") or "").strip() == base_url_hash:
|
|
return item
|
|
account_tokens = text_tokens(row.get("name"))
|
|
if not account_tokens:
|
|
return None
|
|
best: tuple[int, dict[str, Any]] | None = None
|
|
for item in monitor_items(status_payload):
|
|
monitor_tokens = text_tokens(item.get("name"))
|
|
if not monitor_tokens:
|
|
continue
|
|
overlap = account_tokens & monitor_tokens
|
|
if not overlap:
|
|
continue
|
|
subset_bonus = 10 if account_tokens <= monitor_tokens or monitor_tokens <= account_tokens else 0
|
|
score = subset_bonus + len(overlap)
|
|
if best is None or score > best[0]:
|
|
best = (score, item)
|
|
return None if best is None else best[1]
|
|
|
|
|
|
def monitor_detail(row: dict[str, Any], status_payload: dict[str, Any]) -> str:
|
|
item = matching_monitor(row, status_payload)
|
|
if not item:
|
|
return "monitor -"
|
|
status = str(item.get("latest_status") or "unknown")
|
|
latency = format_latency(item.get("latency_ms"))
|
|
checked = short_time(item.get("checked_at"))
|
|
message = str(item.get("message") or "").strip()
|
|
detail = f"monitor {status} {latency} checked {checked}"
|
|
if message:
|
|
detail += f" {message}"
|
|
return detail
|
|
|
|
|
|
def monitor_availability(row: dict[str, Any], status_payload: dict[str, Any]) -> str:
|
|
item = matching_monitor(row, status_payload)
|
|
if not item:
|
|
return "-"
|
|
if item.get("enabled") is False:
|
|
return "disabled"
|
|
raw_status = str(item.get("latest_status") or "").strip().lower()
|
|
if not raw_status:
|
|
return "unknown"
|
|
kind = status_kind(raw_status)
|
|
if kind == "ok":
|
|
return "ok"
|
|
if kind == "failed":
|
|
return "failed"
|
|
return raw_status
|
|
|
|
|
|
def window_for(account: dict[str, Any], window_id: str) -> dict[str, Any]:
|
|
for window in account.get("windows") or []:
|
|
if isinstance(window, dict) and window.get("id") == window_id:
|
|
return window
|
|
return {}
|
|
|
|
|
|
def window_cell(account: dict[str, Any], window_id: str) -> str:
|
|
window = window_for(account, window_id)
|
|
if not window or window.get("used_percent") is None:
|
|
return "-"
|
|
return f"{format_percent(window.get('used_percent'))}/{format_percent(window.get('remaining_percent'))}"
|
|
|
|
|
|
def daily_quota_cell(account: dict[str, Any]) -> str:
|
|
limit = account.get("daily_quota_limit")
|
|
used = account.get("daily_quota_used")
|
|
if limit is None:
|
|
return "-"
|
|
return f"{used or 0}/{limit}"
|
|
|
|
|
|
def reset_cell(account: dict[str, Any]) -> str:
|
|
daily_reset = short_time(account.get("daily_quota_reset_at"))
|
|
if daily_reset != "-":
|
|
return daily_reset
|
|
five_hour = window_for(account, "five-hour")
|
|
weekly = window_for(account, "weekly")
|
|
for window in (five_hour, weekly):
|
|
reset = short_time(window.get("reset"))
|
|
if reset != "-":
|
|
return reset
|
|
return "-"
|
|
|
|
|
|
def normalize_account_rows(payload: dict[str, Any], filter_text: str = "") -> list[dict[str, Any]]:
|
|
needle = filter_text.strip().lower()
|
|
rows = []
|
|
for account in payload.get("accounts") or []:
|
|
if not isinstance(account, dict):
|
|
continue
|
|
haystack = " ".join(
|
|
str(account.get(key) or "")
|
|
for key in (
|
|
"id",
|
|
"name",
|
|
"routing_group",
|
|
"provider",
|
|
"kind",
|
|
"status",
|
|
"account_type",
|
|
"platform",
|
|
"plan",
|
|
"daily_quota_timezone",
|
|
)
|
|
).lower()
|
|
if needle and needle not in haystack:
|
|
continue
|
|
routing_group = str(account.get("routing_group") or "-")
|
|
rows.append(
|
|
{
|
|
"id": as_int(account.get("id")),
|
|
"name": str(account.get("name") or ""),
|
|
"routing_group": routing_group,
|
|
"base_url_hash": str(account.get("base_url_hash") or ""),
|
|
"provider": provider_label(account),
|
|
"kind": str(account.get("kind") or "unknown"),
|
|
"kind_label": kind_label(account.get("kind")),
|
|
"status": str(account.get("status") or ""),
|
|
"account_type": str(account.get("account_type") or ""),
|
|
"plan": str(account.get("plan") or ""),
|
|
"priority": account.get("priority"),
|
|
"today_cost_usd": as_float(account.get("today_cost_usd")),
|
|
"today_actual_cost_usd": as_float(account.get("today_actual_cost_usd")),
|
|
"today_tokens": as_int(account.get("today_tokens")),
|
|
"today_requests": as_int(account.get("today_requests")),
|
|
"daily_quota": account.get("daily_quota") if isinstance(account.get("daily_quota"), dict) else {},
|
|
"daily_quota_limit": account.get("daily_quota_limit"),
|
|
"daily_quota_used": account.get("daily_quota_used"),
|
|
"daily_quota_remaining": account.get("daily_quota_remaining"),
|
|
"daily_quota_used_percent": account.get("daily_quota_used_percent"),
|
|
"daily_quota_reset_at": account.get("daily_quota_reset_at"),
|
|
"daily_quota_cell": daily_quota_cell(account),
|
|
"quota_used_percent_max": account.get("quota_used_percent_max"),
|
|
"five_hour": window_cell(account, "five-hour"),
|
|
"weekly": window_cell(account, "weekly"),
|
|
"reset": reset_cell(account),
|
|
"latest_usage_at": short_time(account.get("latest_usage_at") or account.get("last_used_at")),
|
|
"error": str(account.get("error") or ""),
|
|
"raw": account,
|
|
}
|
|
)
|
|
rows.sort(
|
|
key=lambda item: (
|
|
routing_group_sort_rank(item["routing_group"]),
|
|
1 if item["kind"] == "disabled" else 0,
|
|
-as_float(item["today_cost_usd"]),
|
|
-as_int(item["today_tokens"]),
|
|
-as_float(item["daily_quota_used_percent"]),
|
|
-as_float(item["quota_used_percent_max"]),
|
|
str(item["name"]).lower(),
|
|
)
|
|
)
|
|
return rows
|
|
|
|
|
|
def summary_line(payload: dict[str, Any]) -> str:
|
|
totals = payload.get("totals") if isinstance(payload.get("totals"), dict) else {}
|
|
generated = short_time(payload.get("generated_local") or payload.get("generated_at"))
|
|
cached = "cached" if payload.get("cached") else "live"
|
|
return (
|
|
f"{generated} {cached} | "
|
|
f"{as_int(totals.get('usable_accounts'))}/{as_int(totals.get('total_accounts'))} usable | "
|
|
f"today {format_cost(totals.get('today_cost_usd'))} | "
|
|
f"{format_count(totals.get('today_tokens'))} tokens | "
|
|
f"{format_count(totals.get('today_requests'))} req"
|
|
)
|
|
|
|
|
|
def print_once(payload: dict[str, Any], filter_text: str = "", status_payload: dict[str, Any] | None = None, status_error: str = "") -> None:
|
|
print(summary_line(payload))
|
|
if status_payload or status_error:
|
|
print(monitor_summary(status_payload or {}, status_error))
|
|
print("name provider group daily today tokens req kind 5h 7d reset status availability")
|
|
for row in normalize_account_rows(payload, filter_text):
|
|
print(
|
|
f"{row['name'][:30]:<30} "
|
|
f"{row['provider']:<9} "
|
|
f"{row['routing_group']:<6} "
|
|
f"{row['daily_quota_cell']:<10} "
|
|
f"{format_cost(row['today_cost_usd']):<10} "
|
|
f"{format_count(row['today_tokens']):<8} "
|
|
f"{format_count(row['today_requests']):<5} "
|
|
f"{row['kind_label']:<9} "
|
|
f"{row['five_hour']:<9} "
|
|
f"{row['weekly']:<9} "
|
|
f"{row['reset']:<11} "
|
|
f"{row['status']:<10} "
|
|
f"{monitor_availability(row, status_payload or {})}"
|
|
)
|
|
|
|
|
|
def run_textual(api_url: str, status_url: str, refresh_seconds: int, timeout: int, version_message: str = "") -> int:
|
|
try:
|
|
from textual.app import App, ComposeResult
|
|
from textual.widgets import DataTable, Footer, Header, Input, Static
|
|
except ImportError:
|
|
print("Textual is required. Run with: uv run --with textual python sub2api_quota_tui.py", file=sys.stderr)
|
|
return 2
|
|
|
|
class Sub2APIQuotaApp(App[None]):
|
|
CSS = """
|
|
#summary { height: 1; padding: 0 1; color: $accent; }
|
|
#filter { height: 3; }
|
|
#accounts { height: 1fr; }
|
|
#detail { height: 3; padding: 0 1; border-top: solid $panel; }
|
|
#status { height: 1; padding: 0 1; color: $text-muted; }
|
|
"""
|
|
BINDINGS = [
|
|
("q", "quit", "Quit"),
|
|
("r", "refresh", "Refresh"),
|
|
("/", "focus_filter", "Filter"),
|
|
]
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.payload: dict[str, Any] = {}
|
|
self.status_payload: dict[str, Any] = {}
|
|
self.status_error = ""
|
|
self.rows: list[dict[str, Any]] = []
|
|
self.row_by_key: dict[str, dict[str, Any]] = {}
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Header(show_clock=True)
|
|
yield Static("", id="summary")
|
|
yield Input(placeholder="filter", id="filter")
|
|
yield DataTable(id="accounts")
|
|
yield Static("", id="detail")
|
|
yield Static("", id="status")
|
|
yield Footer()
|
|
|
|
def on_mount(self) -> None:
|
|
table = self.query_one("#accounts", DataTable)
|
|
table.cursor_type = "row"
|
|
table.zebra_stripes = True
|
|
table.add_columns("Name", "Provider", "Group", "Daily", "Today", "Tokens", "Req", "Kind", "5h", "7d", "Reset", "Status", "Availability")
|
|
self.refresh_data(refresh=True)
|
|
self.set_interval(refresh_seconds, self.refresh_data)
|
|
|
|
def action_refresh(self) -> None:
|
|
self.refresh_data(refresh=True)
|
|
|
|
def action_focus_filter(self) -> None:
|
|
self.query_one("#filter", Input).focus()
|
|
|
|
def on_input_changed(self, event: Input.Changed) -> None:
|
|
if event.input.id == "filter":
|
|
self.render_payload()
|
|
|
|
def refresh_data(self, refresh: bool = False) -> None:
|
|
status = self.query_one("#status", Static)
|
|
status.update("refreshing...")
|
|
try:
|
|
self.payload = fetch_payload(api_url, timeout, refresh=refresh)
|
|
self.status_payload, self.status_error = fetch_optional_payload(status_url, timeout)
|
|
self.render_payload()
|
|
status_bits = [version_message, monitor_summary(self.status_payload, self.status_error), f"source {self.payload.get('source_name') or '-'}"]
|
|
status.update(" | ".join(bit for bit in status_bits if bit) + f" | {api_url}")
|
|
except Exception as exc:
|
|
status.update(f"error: {exc}")
|
|
|
|
def render_payload(self) -> None:
|
|
filter_text = self.query_one("#filter", Input).value
|
|
self.rows = normalize_account_rows(self.payload, filter_text)
|
|
table = self.query_one("#accounts", DataTable)
|
|
table.clear()
|
|
self.row_by_key = {}
|
|
for row in self.rows:
|
|
key = str(row["id"])
|
|
self.row_by_key[key] = row
|
|
table.add_row(
|
|
row["name"],
|
|
row["provider"],
|
|
row["routing_group"],
|
|
row["daily_quota_cell"],
|
|
format_cost(row["today_cost_usd"]),
|
|
format_count(row["today_tokens"]),
|
|
format_count(row["today_requests"]),
|
|
row["kind_label"],
|
|
row["five_hour"],
|
|
row["weekly"],
|
|
row["reset"],
|
|
row["status"],
|
|
monitor_availability(row, self.status_payload),
|
|
key=key,
|
|
)
|
|
self.query_one("#summary", Static).update(summary_line(self.payload))
|
|
if self.rows:
|
|
self.render_detail(self.rows[0])
|
|
else:
|
|
self.query_one("#detail", Static).update("no accounts")
|
|
|
|
def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None:
|
|
key = str(event.row_key.value)
|
|
row = self.row_by_key.get(key)
|
|
if row:
|
|
self.render_detail(row)
|
|
|
|
def render_detail(self, row: dict[str, Any]) -> None:
|
|
raw = row.get("raw") if isinstance(row.get("raw"), dict) else {}
|
|
detail = (
|
|
f"{row['name']} | {row['provider']} | group {row['routing_group']} | {row['kind_label']} | {row['account_type']} | "
|
|
f"daily {row['daily_quota_cell']} ({format_percent(row['daily_quota_used_percent'])} used, "
|
|
f"{row['daily_quota_remaining'] if row['daily_quota_remaining'] is not None else '-'} left) | "
|
|
f"today {format_cost(row['today_cost_usd'])}, {format_count(row['today_tokens'])} tokens, "
|
|
f"{format_count(row['today_requests'])} req | latest {row['latest_usage_at']} | "
|
|
f"priority {row['priority'] if row['priority'] is not None else '-'} | "
|
|
f"{monitor_detail(row, self.status_payload)}"
|
|
)
|
|
error = str(raw.get("error") or "").strip()
|
|
if error:
|
|
detail += f" | {error}"
|
|
self.query_one("#detail", Static).update(detail)
|
|
|
|
Sub2APIQuotaApp().run()
|
|
return 0
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description="Sub2API quota and daily usage TUI")
|
|
parser.add_argument("--api-url", default=default_api_url())
|
|
parser.add_argument("--status-url", default=default_status_url(), help="optional sub2api-status /api/status URL for channel monitor health")
|
|
parser.add_argument("--save-config", action="store_true", help="persist --api-url to ~/.config/shusub2/api-url before running")
|
|
parser.add_argument("--install", action="store_true", help="persist --api-url, install shusub2 as a uv tool, then exit")
|
|
parser.add_argument("--version-check-url", default=os.environ.get("SHUSUB2_VERSION_CHECK_URL", DEFAULT_VERSION_CHECK_URL))
|
|
parser.add_argument(
|
|
"--version-check-timeout",
|
|
type=int,
|
|
default=env_int("SHUSUB2_VERSION_CHECK_TIMEOUT", DEFAULT_VERSION_CHECK_TIMEOUT_SECONDS),
|
|
)
|
|
parser.add_argument("--no-version-check", action="store_true", default=os.environ.get("SHUSUB2_NO_VERSION_CHECK", "").strip().lower() in {"1", "true", "yes", "on"})
|
|
parser.add_argument(
|
|
"--refresh-seconds",
|
|
type=int,
|
|
default=env_int("SUB2API_QUOTA_TUI_REFRESH_SECONDS", DEFAULT_REFRESH_SECONDS),
|
|
)
|
|
parser.add_argument("--timeout", type=int, default=env_int("SUB2API_QUOTA_TUI_TIMEOUT", DEFAULT_TIMEOUT_SECONDS))
|
|
parser.add_argument("--once", action="store_true", help="print one snapshot and exit")
|
|
parser.add_argument("--filter", default="", help="initial filter for --once output")
|
|
return parser
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
args = build_parser().parse_args(argv)
|
|
status_url = str(args.status_url or "").strip() or inferred_status_url(args.api_url)
|
|
if args.save_config or args.install:
|
|
config_path = write_api_url_config(args.api_url)
|
|
print(f"saved api url to {config_path}")
|
|
if args.install:
|
|
print("installing shusub2 with uv tool...")
|
|
return run_install_command()
|
|
version_message = check_version_update(
|
|
args.version_check_url,
|
|
max(1, args.version_check_timeout),
|
|
disabled=bool(args.no_version_check),
|
|
)
|
|
if args.once:
|
|
if version_message:
|
|
print(version_message)
|
|
status_payload, status_error = fetch_optional_payload(status_url, args.timeout)
|
|
print_once(fetch_payload(args.api_url, args.timeout, refresh=True), args.filter, status_payload, status_error)
|
|
return 0
|
|
return run_textual(args.api_url, status_url, max(1, args.refresh_seconds), max(1, args.timeout), version_message)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|