虽是V0.3版本,但实际上对体验进行了大量完善优化,力求降低AI接入的门槛,为更多人人群带来专注AI体验; 1.独立导入模式支持:支持直接从当前浏览器打开的中转站自动化导入了 插件/备份导入依旧支持,可通过此最佳实践路径补充完善; 2.批量检测模型进一步优化:余额查看、快速对话、模型性能统计(TTFT、TPS、Latency)能力、站点管理 引入站点管理,简化每次导入逻辑,批量检测更加方便 3.高级代理网关,支持优先级自定义、自动切换、故障转移、多渠道队列、整流修正功能; 支持claude客户端使用openai等模型、自行排序队列、RPM限定等高级设置 mini侧栏可实时观看哪些节点被调度中、实时检测调用性能 4.优化体验:自定义key、标签备注、侧栏实时队列监控、全平台适配已完成 如有疏漏不佳体验、欢迎继续发掘指正问题 开源地址在初始发布贴: https://linux.do/t/topic/1828809 1 个帖子 - 1 位参与者 阅读完整话题
any路由器因为跟claude code有一些参数适配的问题,所以我们可以在本地架设一个简单的网关,将参数在本地拦截,然后修改一下,再传给any大善人,就可以绕过这些参数适配的小问题了。 claudecode最新版本适用,不需要回退版本 这个本地网关做了什么 在本地监听 127.0.0.1:1998。 把 Claude Code 的请求转发到上游any的端口。 自动补认证头(Authorization / x-api-key)。 对 haiku 请求额外修正:补 context-1m-2025-08-07,并加 thinking.budget_tokens=1024。 把请求和响应写到 gateway_requests.jsonl 方便排错。 极简启动步骤 先开网关 export ANTHROPIC_BASE_URL=“any大善人地址” export ANTHROPIC_AUTH_TOKEN=“你的真实token” python3 /Users/Apple/Desktop/code/claude_gateway.py 新开一个终端再开 Claude Code ANTHROPIC_BASE_URL=“ http://127.0.0.1:1998 ” claude --enable-auto-mode 截图为证: 网关代码如下(vibe写的,很多冗余,大佬可以自行修改): #!/usr/bin/env python3 import base64 import json import os import threading from datetime import datetime, timezone from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from urllib.error import HTTPError, URLError from urllib.parse import urlsplit, urlunsplit from urllib.request import Request, urlopen LISTEN_HOST = os.getenv("CLAUDE_GATEWAY_HOST", "127.0.0.1") LISTEN_PORT = int(os.getenv("CLAUDE_GATEWAY_PORT", "1998")) UPSTREAM_BASE_URL = os.getenv( "ANTHROPIC_BASE_URL", "https://a-ocnfniawgw.cn-shanghai.fcapp.run" ) UPSTREAM_AUTH_TOKEN = os.getenv("ANTHROPIC_AUTH_TOKEN", "") UPSTREAM_TIMEOUT = float(os.getenv("CLAUDE_GATEWAY_TIMEOUT", "120")) LOG_PATH = os.getenv( "CLAUDE_GATEWAY_LOG", os.path.join(os.path.dirname(__file__), "gateway_requests.jsonl") ) LOG_LOCK = threading.Lock() def utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() def ensure_log_parent_exists() -> None: parent = os.path.dirname(LOG_PATH) if parent: os.makedirs(parent, exist_ok=True) def decode_body_for_log(body: bytes) -> dict: if not body: return {"encoding": "utf-8", "text": ""} try: return {"encoding": "utf-8", "text": body.decode("utf-8")} except UnicodeDecodeError: return {"encoding": "base64", "text": base64.b64encode(body).decode("ascii")} def append_log(record: dict) -> None: ensure_log_parent_exists() line = json.dumps(record, ensure_ascii=False) with LOG_LOCK: with open(LOG_PATH, "a", encoding="utf-8") as f: f.write(line + "\n") def build_upstream_url(base_url: str, incoming_path: str) -> str: base = urlsplit(base_url) incoming = urlsplit(incoming_path) incoming_path_only = incoming.path or "/" base_path = base.path.rstrip("/") if base_path: merged_path = f"{base_path}{incoming_path_only}" else: merged_path = incoming_path_only return urlunsplit((base.scheme, base.netloc, merged_path, incoming.query, "")) def rewrite_request_headers(headers: dict, path: str) -> dict: rewritten = dict(headers) if UPSTREAM_AUTH_TOKEN: has_x_api_key = any(k.lower() == "x-api-key" for k in rewritten) has_authorization = any(k.lower() == "authorization" for k in rewritten) if not has_x_api_key: rewritten["x-api-key"] = UPSTREAM_AUTH_TOKEN if not has_authorization: rewritten["Authorization"] = f"Bearer {UPSTREAM_AUTH_TOKEN}" # 先做骨架,后续按 any 规则逐步覆写。 return rewritten def rewrite_request_body(body: bytes, headers: dict, path: str) -> bytes: if not body: return body content_type = "" for k, v in headers.items(): if k.lower() == "content-type": content_type = v break if "application/json" not in content_type.lower(): return body try: payload = json.loads(body.decode("utf-8")) except (UnicodeDecodeError, json.JSONDecodeError): return body model = str(payload.get("model", "")).lower() if not model.startswith("claude-haiku"): return body beta_key = None for k in headers.keys(): if k.lower() == "anthropic-beta": beta_key = k break raw_beta = headers.get(beta_key, "") if beta_key else "" beta_features = [item.strip() for item in raw_beta.split(",") if item.strip()] if "context-1m-2025-08-07" not in beta_features: beta_features.append("context-1m-2025-08-07") merged_beta = ",".join(beta_features) if beta_key: headers[beta_key] = merged_beta else: headers["anthropic-beta"] = merged_beta payload["thinking"] = {"type": "enabled", "budget_tokens": 1024} return json.dumps(payload, ensure_ascii=False, separators=(",", ":")).encode("utf-8") class ClaudeGatewayHandler(BaseHTTPRequestHandler): protocol_version = "HTTP/1.1" def do_GET(self): self._handle_proxy() def do_POST(self): self._handle_proxy() def do_PUT(self): self._handle_proxy() def do_PATCH(self): self._handle_proxy() def do_DELETE(self): self._handle_proxy() def do_OPTIONS(self): self._handle_proxy() def do_HEAD(self): self._handle_proxy() def log_message(self, fmt, *args): return def _read_request_body(self) -> bytes: content_length = int(self.headers.get("Content-Length", "0") or "0") if content_length <= 0: return b"" return self.rfile.read(content_length) def _copy_request_headers(self) -> dict: headers = {} for key, value in self.headers.items(): key_l = key.lower() if key_l in {"host", "content-length", "connection", "accept-encoding"}: continue headers[key] = value return headers def _send_response(self, status: int, headers: dict, body: bytes) -> None: self.send_response(status) ignored = {"transfer-encoding", "content-length", "connection"} for k, v in headers.items(): if k.lower() in ignored: continue self.send_header(k, v) self.send_header("Content-Length", str(len(body))) self.send_header("Connection", "close") self.end_headers() if self.command != "HEAD" and body: self.wfile.write(body) def _handle_proxy(self): req_body = self._read_request_body() req_headers = self._copy_request_headers() req_headers = rewrite_request_headers(req_headers, self.path) req_body = rewrite_request_body(req_body, req_headers, self.path) upstream_url = build_upstream_url(UPSTREAM_BASE_URL, self.path) request_log = { "timestamp": utc_now_iso(), "client_ip": self.client_address[0] if self.client_address else "", "method": self.command, "path": self.path, "upstream_url": upstream_url, "headers": dict(self.headers.items()), "body": decode_body_for_log(req_body), "body_length": len(req_body), } try: upstream_req = Request( url=upstream_url, data=req_body if req_body else None, headers=req_headers, method=self.command, ) with urlopen(upstream_req, timeout=UPSTREAM_TIMEOUT) as resp: resp_status = resp.getcode() resp_headers = dict(resp.headers.items()) resp_body = resp.read() request_log["response"] = { "status": resp_status, "headers": resp_headers, "body": decode_body_for_log(resp_body), "body_length": len(resp_body), } append_log(request_log) self._send_response(resp_status, resp_headers, resp_body) return except HTTPError as e: err_body = e.read() if hasattr(e, "read") else b"" err_headers = dict(e.headers.items()) if getattr(e, "headers", None) else {} request_log["response"] = { "status": e.code, "headers": err_headers, "body": decode_body_for_log(err_body), "body_length": len(err_body), } append_log(request_log) self._send_response(e.code, err_headers, err_body) return except (URLError, TimeoutError, Exception) as e: error_payload = { "error": "gateway_upstream_error", "message": str(e), } error_body = json.dumps(error_payload, ensure_ascii=False).encode("utf-8") request_log["response"] = { "status": 502, "headers": {"Content-Type": "application/json; charset=utf-8"}, "body": {"encoding": "utf-8", "text": error_body.decode("utf-8")}, "body_length": len(error_body), } append_log(request_log) self._send_response( 502, {"Content-Type": "application/json; charset=utf-8"}, error_body, ) def main(): server = ThreadingHTTPServer((LISTEN_HOST, LISTEN_PORT), ClaudeGatewayHandler) print(f"[gateway] listening on http://{LISTEN_HOST}:{LISTEN_PORT}") print(f"[gateway] upstream: {UPSTREAM_BASE_URL}") print(f"[gateway] auth token configured: {bool(UPSTREAM_AUTH_TOKEN)}") print(f"[gateway] log file: {LOG_PATH}") server.serve_forever() if __name__ == "__main__": main() 3 个帖子 - 2 位参与者 阅读完整话题
可以打一网关,但是显示 版本:不适用,还有图中的信息。 openclaw 4 个帖子 - 4 位参与者 阅读完整话题
一、背景 用 New-API 做统一网关,接入讯飞星辰 Coding Plan 的 astron-code-latest 模型,目标是后续给 Trae 提供 OpenAI 兼容接口。 二、已完成的配置 三、核心问题 New-API 后台渠道测试时: 手动打开「流式开关」:测试 100% 成功,正常返回模型响应 - 关闭「流式开关」:直接报错,返回: invalid character ‘<’ looking for beginning of value 四、已排查的点 讯飞接口本身正常:用官方工具测试流式 / 非流式,发现官方也只返回流式结果 New-API 容器正常: docker ps 显示端口映射正确,WSL 内部访问正常 网络正常:能正常访问讯飞接口,无防火墙 / 代理拦截 配置无拼写错误:地址、模型 ID、密钥反复核对过 五、我的疑问&求救 讯飞星辰 Coding Plan 的 /v2 接口本身就不支持非流式请求吗?还是我哪里配置错了? New-API 里有没有办法强制开启流式转发?不管上游发什么请求,都自动转成流式发给讯飞? 有没有人遇到过同样的问题?你们是怎么解决的? 2 个帖子 - 1 位参与者 阅读完整话题
今天碰见一个小程序抓不到包,最开始怀疑是云函数然后狠狠的分析了一波不是。在源码里面看见了是微信网关。。。。。 1 个帖子 - 1 位参与者 阅读完整话题
本帖使用社区开源推广,符合推广要求。我申明并遵循社区要求的以下内容: 我的帖子已经打上 开源推广 标签: 是 我的开源项目完整开源,无未开源部分: 是 我的开源项目已链接认可 LINUX DO 社区: 是 我帖子内的项目介绍,AI生成、润色内容部分已截图发出: 是 以上选择我承诺是永久有效的,接受社区和佬友监督: 是 项目地址 github.com GitHub - LoopOuroboros/mcp-hub-lite: A lightweight MCP management platform designed for... A lightweight MCP management platform designed for independent developers, providing MCP server gateway, grouping, fuzzy search, and MCP HttpStream protocol interface. 本项目是一个开发者本地使用的MCP网关。 解决痛点 在ClaudeCode等多个客户端中如果配置了npx、uvx之类的MCP,会在本地启动多个进程,但是MCP的使用率并不值得这些进程重复占用系统资源。 MCP在部分客户端存在工具数量爆炸的问题,需要渐进式发现。 多个相同类型的MCP,仅部分参数、变量不一致,需要将其定义成多个MCP服务来区分。 聚合部分常用的MCP的单个工具到网关上,直接暴露给客户端使用。 提供资源级别暴露方式。 MCP现在在被CLI的方式所替代。 使用方式 # 安装 npm install -g @loop_ouroboros/mcp-hub-lite@latest # 启动 mcp-hub-lite start # 停止 mcp-hub-lite stop # UI界面 mcp-hub-lite ui 渐进式发现介绍 当前的MCP暴露通过 list_servers 暴露当前有哪些MCP服务器,因为MCP协议并没有服务器级别的描述,所以项目追加了一个服务器级别的描述字段,给大模型用于第一轮筛选 list_tools 暴露当前选中的MCP服务器有哪些工具,但是本轮不直接暴露工具的输入参数,用于减少Token get_tool 获取MCP服务器下指定工具的输入参数等完整信息 call_tool 实际调用MCP服务器的工具,并可以通过追加标签来筛选后端指定实例 UI界面展示 MCP服务器列表 MCP服务器模版配置页 MCP服务器实例配置页 此处允许配置多个实例(例如修改认证头,代表多个账号的实例,实例可以修改名称更加直观) MCP服务器工具详情页 此处的聚合功能为将该MCP服务的一条工具聚合到MCP网关上,连同MCP网关的系统工具一起暴露给客户端使用,效果如下: MCP服务器工具调用调试页 客户端使用 项目针对每一个MCP服务生成了对应的资源,使得可以通过客户端直接加载资源的方式提示大模型使用指定的MCP服务 渐进式调用如下: MCP CLI方式 2 个帖子 - 2 位参与者 阅读完整话题
主站 ai.centos.hk New API 统一的 AI 模型聚合与分发网关,支持将各类大语言模型跨格式转换为 OpenAI、Claude、Gemini 兼容接口,为个人与企业提供集中式模型管理与网关服务。 折扣详情 (充值比例为一比一,也就是一元一刀) Claude max分组倍率降至 1.55 Claude 官key api分组倍率降至 3.00 官方Q群 进群答案:星辰ai qun.qq.com QQ群 QQ 免费试用体验 体验兑换码领取链接 cdk.linux.do LINUX DO CDK Linux Do 社区 CDK 快速分享平台 - 让分享变得更简单 29 个帖子 - 23 位参与者 阅读完整话题
IT之家 4 月 14 日消息,远江盛邦安全科技集团股份有限公司宣布其链路加密网关(RayHLink)正式获得商用密码产品认证证书,拥有国密原生支持、超高吞吐、极低时延等核心优势,打破行业“安全与速度不可兼得”的魔咒。 IT之家注:加密网关是一种安全设备,会在数据离开内部网络(如内网、数据中心)传输至外部时,自动对其进行加密、身份认证与访问控制,形成一道安全“闸门”,但加解密等操作会造成数据访问延缓等问题。 据介绍,商用密码产品认证是国家对密码产品安全性、合规性的认可,更是产品进入政务、金融、能源、算力网络等关键领域的核心准入门槛,能否获得认证直接决定产品能否满足行业密评落地要求。 这是 200G 吞吐量商密产品首次获得该证书,标志着产品全维度符合国家商用密码标准规范,完美适配各行业商用密码应用安全性评估要求。 据介绍,这款产品深度适配 SM2、SM3、SM4 等国密算法,可选配国际通用算法,实现国密算法和国际算法的超高速硬件并行处理,核心算法与 CPU+FPGA 全国产化异构架构 100% 自主可控,从根源上杜绝技术后门;产品支持 X.509 证书管理、外部工作密钥接收、策略集中管控,让国密应用更灵活、更易管。 在核心性能上,这款产品经中国信通院泰尔实验室权威检测,整机实现 200Gbps 明通吞吐、195Gbps 加密吞吐,单口加密速率达 97.68Gbps,逼近 100G 网络线速极限,吞吐量较传统方案提升 5 倍;加密时延平均低于 3 微秒,64 字节小包时延仅 1.128 微秒,较传统方案性能缩短 300 倍。 另外,这款产品在合规与性能双优的基础上还支持 IP 层明通 / 密通灵活切换、SNMP 远程监控,可完美适配“东数西算”跨域算力调度、智算中心 AI 训练、卫星互联网高通量传输、金融高频交易等各类严苛场景。