相对管理后台的 ssl来说,其实网站的 ssl 证书才是正事,毕竟这个关系到网站的访问。按照官方的说法在开放 80 端口的情况下,南墙可以自动申请更新证书,不过后台没找到配置的地方,我的 v4 的 80 也是不通的,所以就需要自己去维护管理证书了。
然而,上午在问了管理之后,得到的答复是没有 api,可以自己抓包进行修改。
嗐,这么看来其实也没啥,最起码说明后台的 api 接口是可以直接拿来用的。即使是有 api 文档,也是得自己去看,去写,没有的话 curl 抓包一样能解决问题。按照之前的方法,只直接复制 curl 给 cursor 就可以了。
api 文件baby_nanqiang_api_tools.py内容:
#!/usr/bin/env python3 import requests import json import jwt from datetime import datetime import os import urllib3 # 禁用 SSL 验证警告 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class NanQiangAPI: def __init__(self, base_url="https://lang.bi:443"): self.base_url = base_url self.session = requests.Session() self.session.verify = False # 忽略SSL证书验证 self.token = None self._setup_headers() def _setup_headers(self): """设置请求头""" self.headers = { 'accept': 'application/json, text/plain, */*', 'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7', 'cache-control': 'no-cache', 'content-type': 'application/json', 'origin': self.base_url, 'pragma': 'no-cache', 'priority': 'u=1, i', 'referer': f'{self.base_url}/', 'sec-ch-ua': '"Chromium";v="134", "Not:A-Brand";v="24", "Google Chrome";v="134"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36' } def _update_headers_with_token(self): """更新请求头,添加token""" if self.token: self.headers['Authorization'] = self.token # 直接使用token,不添加'Bearer '前缀 def delete_cert(self, cert_id): """ 删除指定ID的证书 :param cert_id: 证书ID :return: 删除结果 """ if not self.is_logged_in(): print("请先登录") return None url = f"{self.base_url}/api/v1/certs/{cert_id}" try: response = self.session.delete( url, headers=self.headers ) response_data = response.json() if 'err' in response_data: print(f"删除证书失败: {response_data['err']}") return None # 检查删除是否成功 if response_data.get('result') == 'success' and response_data.get('RowsAffected') > 0: print(f"证书 {cert_id} 删除成功") return True else: print(f"证书 {cert_id} 删除失败: 未找到证书或删除操作未生效") return False except requests.exceptions.RequestException as e: print(f"删除证书请求失败: {str(e)}") return None except json.JSONDecodeError as e: print(f"解析响应数据失败: {str(e)}") return None def parse_cert_list(self, cert_list): """ 解析证书列表数据 :param cert_list: 证书列表数据 :return: 解析后的证书信息列表 """ if not cert_list: return None parsed_certs = [] for cert in cert_list: try: # 解析SNI字段(JSON字符串) sni_list = json.loads(cert.get('sni', '[]')) parsed_cert = { 'id': cert.get('id'), 'sni': sni_list, 'expire_time': cert.get('expire_time'), 'update_time': cert.get('update_time') } parsed_certs.append(parsed_cert) except json.JSONDecodeError as e: print(f"解析SNI字段失败: {str(e)}") continue except Exception as e: print(f"解析证书数据失败: {str(e)}") continue return parsed_certs def get_cert_list(self): """ 获取证书列表 :return: 证书列表 """ if not self.is_logged_in(): print("请先登录") return None url = f"{self.base_url}/api/v1/certs/" try: response = self.session.get( url, headers=self.headers ) response_data = response.json() if 'err' in response_data: print(f"获取证书列表失败: {response_data['err']}") return None return response_data except requests.exceptions.RequestException as e: print(f"获取证书列表请求失败: {str(e)}") return None except json.JSONDecodeError as e: print(f"解析响应数据失败: {str(e)}") return None def login(self, username, password, otp=""): """ 登录接口 :param username: 用户名 :param password: 密码 :param otp: 双因素认证码(可选) :return: 登录响应 """ url = f"{self.base_url}/api/v1/users/login" data = { "usr": username, "pwd": password, "otp": otp } try: response = self.session.post( url, headers=self.headers, json=data ) # 获取响应数据 response_data = response.json() # 检查是否有错误信息 if 'err' in response_data: print(f"登录失败: {response_data['err']}") return None # 保存token if 'token' in response_data: self.token = response_data['token'] self._update_headers_with_token() # # 解析token信息 # try: # # 使用 jwt.decode 替代 jwt.decode_complete # token_data = jwt.decode(self.token, options={"verify_signature": False}) # exp_timestamp = token_data.get('exp') # if exp_timestamp: # exp_date = datetime.fromtimestamp(exp_timestamp) # print(f"Token 有效期至: {exp_date}") # except Exception as e: # print(f"无法解析token信息: {str(e)}") return response_data except requests.exceptions.RequestException as e: print(f"登录请求失败: {str(e)}") return None except json.JSONDecodeError as e: print(f"解析响应数据失败: {str(e)}") return None def check_cert(self, cert_content, key_content, mode=0): """ 检查证书 :param cert_content: 证书内容 :param key_content: 私钥内容 :param mode: 模式,默认为0 :return: 检查结果 """ if not self.is_logged_in(): print("请先登录") return None url = f"{self.base_url}/api/v1/certs/check" # 准备multipart/form-data数据 files = { 'mode': (None, str(mode)), 'cert': (None, cert_content), 'key': (None, key_content) } try: # 临时移除content-type,让requests自动设置 headers = self.headers.copy() headers.pop('content-type', None) response = self.session.post( url, headers=headers, files=files ) response_data = response.json() if 'err' in response_data: print(f"证书检查失败: {response_data['err']}") return None return response_data except requests.exceptions.RequestException as e: print(f"证书检查请求失败: {str(e)}") return None except json.JSONDecodeError as e: print(f"解析响应数据失败: {str(e)}") return None def check_cert_from_files(self, cert_file_path, key_file_path, mode=0): """ 从文件检查证书 :param cert_file_path: 证书文件路径 :param key_file_path: 私钥文件路径 :param mode: 模式,默认为0 :return: 检查结果 """ try: with open(cert_file_path, 'r') as f: cert_content = f.read() with open(key_file_path, 'r') as f: key_content = f.read() return self.check_cert(cert_content, key_content, mode) except FileNotFoundError as e: print(f"文件不存在: {str(e)}") return None except Exception as e: print(f"读取文件失败: {str(e)}") return None def submit_cert_config(self, check_result): """ 提交证书配置 :param check_result: 证书检查的结果数据 :return: 提交结果 """ if not self.is_logged_in(): print("请先登录") return None if not check_result: print("无效的证书检查结果") return None url = f"{self.base_url}/api/v1/certs/config" # 准备提交数据 data = { "id": check_result.get("id", 0), "sni": check_result.get("sni", "[]"), "cert": check_result.get("cert", ""), "key": check_result.get("key", ""), "expire_time": check_result.get("expire_time", ""), "update_time": check_result.get("update_time", "") } try: response = self.session.post( url, headers=self.headers, json=data ) response_data = response.json() if 'err' in response_data: print(f"证书配置提交失败: {response_data['err']}") return None return response_data except requests.exceptions.RequestException as e: print(f"证书配置提交请求失败: {str(e)}") return None except json.JSONDecodeError as e: print(f"解析响应数据失败: {str(e)}") return None def is_logged_in(self): """ 检查是否已登录 :return: bool """ return self.token is not None def main(): # 使用示例 api = NanQiangAPI() # 登录信息 username = "obaby" password = "obaby@mars" # 执行登录 result = api.login(username, password) if result: print("登录成功:") print(json.dumps(result, indent=2, ensure_ascii=False)) print(f"Token: {api.token}") # 获取证书列表 cert_list = api.get_cert_list() if cert_list: # 解析证书列表 parsed_certs = api.parse_cert_list(cert_list) if parsed_certs: print("解析后的证书列表:") print(json.dumps(parsed_certs, indent=2, ensure_ascii=False)) # # 删除证书示例 # cert_id = 4 # 要删除的证书ID # delete_result = api.delete_cert(cert_id) # if delete_result: # print(f"证书 {cert_id} 删除成功") # else: # print(f"证书 {cert_id} 删除失败") # 证书检查示例 cert_file = "path/to/cert.pem" key_file = "path/to/key.pem" if os.path.exists(cert_file) and os.path.exists(key_file): return # 先检查证书 cert_result = api.check_cert_from_files(cert_file, key_file) if cert_result: print("证书检查结果:") print(json.dumps(cert_result, indent=2, ensure_ascii=False)) # 提交证书配置 submit_result = api.submit_cert_config(cert_result) if submit_result: print("证书配置提交成功:") print(json.dumps(submit_result, indent=2, ensure_ascii=False)) else: print("证书配置提交失败") else: print("登录失败") if __name__ == "__main__": main()
账号不要设置动态密码,如果设置了,那就创建一个新账号。
获取证书的脚本参考上一篇文章,对应的路径自己调整。更新证书的代码site_cert_auto_update_tool.py:
#!/usr/bin/env python3 import os import subprocess import hashlib import json from datetime import datetime import logging from baby_nanqiang_api_tools import NanQiangAPI # Configuration CERT_SOURCE_DIR = "/root/.acme.sh/h4ck.org.cn_ecc" CERT_FILE = "fullchain.cer" KEY_FILE = "h4ck.org.cn.key" HASH_FILE = "web_cert_hash.json" CERT_SCRIPT = "get_web_cert.sh" def setup_logging(): """设置日志""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('web_cert_update.log'), logging.StreamHandler() ] ) def get_file_hash(file_path): """计算文件的SHA-256哈希值""" sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() def save_cert_hash(cert_hash, key_hash): """保存证书和私钥的哈希值到JSON文件""" with open(HASH_FILE, 'w') as f: json.dump({ 'cert_hash': cert_hash, 'key_hash': key_hash }, f) def load_cert_hash(): """从JSON文件加载证书和私钥的哈希值""" try: with open(HASH_FILE, 'r') as f: data = json.load(f) return data.get('cert_hash'), data.get('key_hash') except (FileNotFoundError, json.JSONDecodeError): return None, None def run_get_cert_script(script_path=None): """ 执行获取证书的脚本 :param script_path: 脚本路径,如果为None则使用默认的get_web_cert.sh :return: bool 是否执行成功 """ try: # 如果没有指定脚本路径,使用默认的get_web_cert.sh if script_path is None: script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), CERT_SCRIPT) # 检查脚本是否存在 if not os.path.exists(script_path): logging.error(f"错误: 脚本文件 {script_path} 不存在") return False # 检查脚本是否可执行 if not os.access(script_path, os.X_OK): logging.error(f"错误: 脚本文件 {script_path} 没有执行权限") return False # 执行脚本 result = subprocess.run(['sh', script_path], capture_output=True, text=True) # 检查执行结果 if result.returncode == 0: logging.info("证书获取脚本执行成功") if result.stdout: logging.info("脚本输出:\n%s", result.stdout) return True else: logging.error(f"证书获取脚本执行异常,返回码: {result.returncode}") if result.stderr: logging.error("异常输出:\n%s", result.stderr) return True except Exception as e: logging.error(f"执行证书获取脚本时发生错误: {str(e)}") return False def read_file_content(file_path): """读取文件内容""" try: with open(file_path, 'r') as f: return f.read() except Exception as e: logging.error(f"读取文件 {file_path} 失败: {str(e)}") return None def is_cert_expired(expire_time_str): """ 检查证书是否过期或即将过期(7天内) :param expire_time_str: 过期时间字符串 :return: bool 是否过期或即将过期 """ try: expire_time = datetime.strptime(expire_time_str, "%Y-%m-%d %H:%M:%S") now = datetime.now() days_until_expire = (expire_time - now).days return days_until_expire <= 7 except Exception as e: logging.error(f"解析过期时间失败: {str(e)}") return False def process_same_sni_certs(api, parsed_certs, current_sni, current_cert_id): """ 处理具有相同SNI的证书 :param api: API实例 :param parsed_certs: 解析后的证书列表 :param current_sni: 当前证书的SNI :param current_cert_id: 当前证书的ID :return: None """ # 筛选出相同SNI的证书 same_sni_certs = [cert for cert in parsed_certs if cert['sni'] == current_sni and cert['id'] != current_cert_id] if not same_sni_certs: return # 按过期时间排序(从早到晚) same_sni_certs.sort(key=lambda x: datetime.strptime(x['expire_time'], "%Y-%m-%d %H:%M:%S")) # 检查是否有过期或即将过期的证书 for cert in same_sni_certs: if is_cert_expired(cert['expire_time']): logging.info(f"删除过期证书 ID: {cert['id']}") if not api.delete_cert(cert['id']): logging.error(f"删除证书 {cert['id']} 失败") # 检查是否有过期时间相同的证书 if len(same_sni_certs) > 1: # 获取第一个证书的过期时间作为基准 base_expire_time = same_sni_certs[0]['expire_time'] # 删除过期时间相同的证书(保留第一个) for cert in same_sni_certs[1:]: if cert['expire_time'] == base_expire_time: logging.info(f"删除重复过期时间的证书 ID: {cert['id']}") if not api.delete_cert(cert['id']): logging.error(f"删除证书 {cert['id']} 失败") def main(): # 设置日志 setup_logging() try: # 执行证书获取脚本 if not run_get_cert_script(): logging.error("获取证书失败,退出程序") return # 检查证书文件是否存在 cert_path = os.path.join(CERT_SOURCE_DIR, CERT_FILE) key_path = os.path.join(CERT_SOURCE_DIR, KEY_FILE) if not (os.path.exists(cert_path) and os.path.exists(key_path)): logging.error("证书文件不存在,退出程序") return # 计算新文件的哈希值 new_cert_hash = get_file_hash(cert_path) new_key_hash = get_file_hash(key_path) # 获取旧的哈希值 old_cert_hash, old_key_hash = load_cert_hash() # 检查文件是否发生变化 if new_cert_hash != old_cert_hash or new_key_hash != old_key_hash: logging.info("证书文件已发生变化,开始更新流程") # 读取证书和私钥内容 cert_content = read_file_content(cert_path) key_content = read_file_content(key_path) if not cert_content or not key_content: logging.error("读取证书文件失败") return # 初始化API api = NanQiangAPI() # 登录 if not api.login("obaby", "obaby@mars"): logging.error("登录失败") return # 检查证书 check_result = api.check_cert(cert_content, key_content) if not check_result: logging.error("证书检查失败") return # 提交证书配置 if not api.submit_cert_config(check_result): logging.error("提交证书配置失败") return # 获取证书列表 cert_list = api.get_cert_list() if not cert_list: logging.error("获取证书列表失败") return # 解析证书列表 parsed_certs = api.parse_cert_list(cert_list) if not parsed_certs: logging.error("解析证书列表失败") return # 获取当前证书的SNI current_sni = check_result.get('sni', '[]') try: current_sni = json.loads(current_sni) except json.JSONDecodeError: logging.error("解析当前证书SNI失败") return # 处理相同SNI的证书 process_same_sni_certs(api, parsed_certs, current_sni, check_result.get('id')) # 保存新的哈希值 save_cert_hash(new_cert_hash, new_key_hash) logging.info("证书更新完成") else: logging.info("证书文件未发生变化,无需更新") except Exception as e: logging.error(f"程序执行出错: {str(e)}", exc_info=True) if __name__ == "__main__": main()
添加定时任务,每天,或者每几天:
0 2 * * * /usr/bin/python3 /home/soft/baby-nanqiang-cert-tools/site_cert_auto_update_tool.py >> /home/soft/baby-nanqiang-cert-tools/web_cert_manager.log 2>&1
最终效果:
26 comments
为什么要谢你,那算推广吗?我写了那么多,怎么没人来谢我,哼
我这是推广加生态完善好吧。丰富他们的技术能力,嘻嘻
自动申请及部署,能部署到cdn上就牛逼了!你这是企业级别的要是能部署到宝塔面板,用户量可能就更大了
cdn有接口也可以 但是没必要 多数cdn都支持自动申请
华为云的CDN就不太行,不过华为云的API是真丰富,调用很方便。
纯cdn不带防御 又是大厂的 直接不敢用
随时被刷
封面图越来越抽象了 哈哈哈哈
哈哈哈
看不懂 但是封面图我的菜~
哈哈哈 好歹有一样
现在SSL周期越来越短了,CF自动SSL挺省心的。 对了我看雷池蛮不错的。
也试过 太多功能不对个人版开放
才发现真是这样已经卸载了雷池
主要是专业版授权太贵了 一年四千
有空我也试试南墙 主要没有多余的机器😂
可以部署在一台机器上
uucorp 是个昵称还是个应用?
组织昵称,哈哈哈,官方名称
现在还有免费的一年期的吗?前段时间我在一个群里问过,有人说有,但是没下文。一直用的腾讯云申请的,手动太麻烦了。
早就没了,腾讯的的确麻烦。
每次看到“南墙”,都有一种不撞南墙不死心的想法
现在免费的SSL证书时间真的越来越短了。卖的证书,加起来比我服务器贵了好几倍,想方设法赚我们的血汗钱,太南了!
就是这个意思,哈哈哈。撞南墙
习惯了一年更换一次SSL证书
一年的有效期真不错,现在找不到免费一年的了
我在宝塔里设了更新SSL证书,不知道效果如何,等到期时候看看。
宝塔的应该可以