南墙 WAF 系列(二)– 网站证书自动更新

相对管理后台的 ssl来说,其实网站的 ssl 证书才是正事,毕竟这个关系到网站的访问。按照官方的说法在开放 80 端口的情况下,南墙可以自动申请更新证书,不过后台没找到配置的地方,我的 v4 的 80 也是不通的,所以就需要自己去维护管理证书了。

然而,上午在问了管理之后,得到的答复是没有 api,可以自己抓包进行修改。

嗐,这么看来其实也没啥,最起码说明后台的 api 接口是可以直接拿来用的。即使是有 api 文档,也是得自己去看,去写,没有的话 curl 抓包一样能解决问题。按照之前的方法,只直接复制 curl 给 cursor 就可以了。

api 文件baby_nanqiang_api_tools.py内容:

#!/usr/bin/env python3
import requests
import json
import jwt
from datetime import datetime
import os
import urllib3

# 禁用 SSL 验证警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

class NanQiangAPI:
    def __init__(self, base_url="https://lang.bi:443"):
        self.base_url = base_url
        self.session = requests.Session()
        self.session.verify = False  # 忽略SSL证书验证
        self.token = None
        self._setup_headers()

    def _setup_headers(self):
        """设置请求头"""
        self.headers = {
            'accept': 'application/json, text/plain, */*',
            'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
            'cache-control': 'no-cache',
            'content-type': 'application/json',
            'origin': self.base_url,
            'pragma': 'no-cache',
            'priority': 'u=1, i',
            'referer': f'{self.base_url}/',
            'sec-ch-ua': '"Chromium";v="134", "Not:A-Brand";v="24", "Google Chrome";v="134"',
            'sec-ch-ua-mobile': '?0',
            'sec-ch-ua-platform': '"macOS"',
            'sec-fetch-dest': 'empty',
            'sec-fetch-mode': 'cors',
            'sec-fetch-site': 'same-origin',
            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36'
        }

    def _update_headers_with_token(self):
        """更新请求头,添加token"""
        if self.token:
            self.headers['Authorization'] = self.token  # 直接使用token,不添加'Bearer '前缀

    def delete_cert(self, cert_id):
        """
        删除指定ID的证书
        :param cert_id: 证书ID
        :return: 删除结果
        """
        if not self.is_logged_in():
            print("请先登录")
            return None

        url = f"{self.base_url}/api/v1/certs/{cert_id}"

        try:
            response = self.session.delete(
                url,
                headers=self.headers
            )
            
            response_data = response.json()
            
            if 'err' in response_data:
                print(f"删除证书失败: {response_data['err']}")
                return None
            
            # 检查删除是否成功
            if response_data.get('result') == 'success' and response_data.get('RowsAffected') > 0:
                print(f"证书 {cert_id} 删除成功")
                return True
            else:
                print(f"证书 {cert_id} 删除失败: 未找到证书或删除操作未生效")
                return False
            
        except requests.exceptions.RequestException as e:
            print(f"删除证书请求失败: {str(e)}")
            return None
        except json.JSONDecodeError as e:
            print(f"解析响应数据失败: {str(e)}")
            return None

    def parse_cert_list(self, cert_list):
        """
        解析证书列表数据
        :param cert_list: 证书列表数据
        :return: 解析后的证书信息列表
        """
        if not cert_list:
            return None

        parsed_certs = []
        for cert in cert_list:
            try:
                # 解析SNI字段(JSON字符串)
                sni_list = json.loads(cert.get('sni', '[]'))
                
                parsed_cert = {
                    'id': cert.get('id'),
                    'sni': sni_list,
                    'expire_time': cert.get('expire_time'),
                    'update_time': cert.get('update_time')
                }
                parsed_certs.append(parsed_cert)
            except json.JSONDecodeError as e:
                print(f"解析SNI字段失败: {str(e)}")
                continue
            except Exception as e:
                print(f"解析证书数据失败: {str(e)}")
                continue

        return parsed_certs

    def get_cert_list(self):
        """
        获取证书列表
        :return: 证书列表
        """
        if not self.is_logged_in():
            print("请先登录")
            return None

        url = f"{self.base_url}/api/v1/certs/"

        try:
            response = self.session.get(
                url,
                headers=self.headers
            )
            
            response_data = response.json()
            
            if 'err' in response_data:
                print(f"获取证书列表失败: {response_data['err']}")
                return None
                
            return response_data
            
        except requests.exceptions.RequestException as e:
            print(f"获取证书列表请求失败: {str(e)}")
            return None
        except json.JSONDecodeError as e:
            print(f"解析响应数据失败: {str(e)}")
            return None

    def login(self, username, password, otp=""):
        """
        登录接口
        :param username: 用户名
        :param password: 密码
        :param otp: 双因素认证码(可选)
        :return: 登录响应
        """
        url = f"{self.base_url}/api/v1/users/login"
        data = {
            "usr": username,
            "pwd": password,
            "otp": otp
        }

        try:
            response = self.session.post(
                url,
                headers=self.headers,
                json=data
            )
            
            # 获取响应数据
            response_data = response.json()
            
            # 检查是否有错误信息
            if 'err' in response_data:
                print(f"登录失败: {response_data['err']}")
                return None
            
            # 保存token
            if 'token' in response_data:
                self.token = response_data['token']
                self._update_headers_with_token()
                
                # # 解析token信息
                # try:
                #     # 使用 jwt.decode 替代 jwt.decode_complete
                #     token_data = jwt.decode(self.token, options={"verify_signature": False})
                #     exp_timestamp = token_data.get('exp')
                #     if exp_timestamp:
                #         exp_date = datetime.fromtimestamp(exp_timestamp)
                #         print(f"Token 有效期至: {exp_date}")
                # except Exception as e:
                #     print(f"无法解析token信息: {str(e)}")
                
            return response_data
            
        except requests.exceptions.RequestException as e:
            print(f"登录请求失败: {str(e)}")
            return None
        except json.JSONDecodeError as e:
            print(f"解析响应数据失败: {str(e)}")
            return None

    def check_cert(self, cert_content, key_content, mode=0):
        """
        检查证书
        :param cert_content: 证书内容
        :param key_content: 私钥内容
        :param mode: 模式,默认为0
        :return: 检查结果
        """
        if not self.is_logged_in():
            print("请先登录")
            return None

        url = f"{self.base_url}/api/v1/certs/check"
        
        # 准备multipart/form-data数据
        files = {
            'mode': (None, str(mode)),
            'cert': (None, cert_content),
            'key': (None, key_content)
        }

        try:
            # 临时移除content-type,让requests自动设置
            headers = self.headers.copy()
            headers.pop('content-type', None)
            
            response = self.session.post(
                url,
                headers=headers,
                files=files
            )
            
            response_data = response.json()
            
            if 'err' in response_data:
                print(f"证书检查失败: {response_data['err']}")
                return None
                
            return response_data
            
        except requests.exceptions.RequestException as e:
            print(f"证书检查请求失败: {str(e)}")
            return None
        except json.JSONDecodeError as e:
            print(f"解析响应数据失败: {str(e)}")
            return None

    def check_cert_from_files(self, cert_file_path, key_file_path, mode=0):
        """
        从文件检查证书
        :param cert_file_path: 证书文件路径
        :param key_file_path: 私钥文件路径
        :param mode: 模式,默认为0
        :return: 检查结果
        """
        try:
            with open(cert_file_path, 'r') as f:
                cert_content = f.read()
            with open(key_file_path, 'r') as f:
                key_content = f.read()
                
            return self.check_cert(cert_content, key_content, mode)
            
        except FileNotFoundError as e:
            print(f"文件不存在: {str(e)}")
            return None
        except Exception as e:
            print(f"读取文件失败: {str(e)}")
            return None

    def submit_cert_config(self, check_result):
        """
        提交证书配置
        :param check_result: 证书检查的结果数据
        :return: 提交结果
        """
        if not self.is_logged_in():
            print("请先登录")
            return None

        if not check_result:
            print("无效的证书检查结果")
            return None

        url = f"{self.base_url}/api/v1/certs/config"
        
        # 准备提交数据
        data = {
            "id": check_result.get("id", 0),
            "sni": check_result.get("sni", "[]"),
            "cert": check_result.get("cert", ""),
            "key": check_result.get("key", ""),
            "expire_time": check_result.get("expire_time", ""),
            "update_time": check_result.get("update_time", "")
        }

        try:
            response = self.session.post(
                url,
                headers=self.headers,
                json=data
            )
            
            response_data = response.json()
            
            if 'err' in response_data:
                print(f"证书配置提交失败: {response_data['err']}")
                return None
                
            return response_data
            
        except requests.exceptions.RequestException as e:
            print(f"证书配置提交请求失败: {str(e)}")
            return None
        except json.JSONDecodeError as e:
            print(f"解析响应数据失败: {str(e)}")
            return None

    def is_logged_in(self):
        """
        检查是否已登录
        :return: bool
        """
        return self.token is not None

def main():
    # 使用示例
    api = NanQiangAPI()
    
    # 登录信息
    username = "obaby"
    password = "obaby@mars"
    
    # 执行登录
    result = api.login(username, password)
    
    if result:
        print("登录成功:")
        print(json.dumps(result, indent=2, ensure_ascii=False))
        print(f"Token: {api.token}")
        
        # 获取证书列表
        cert_list = api.get_cert_list()
        if cert_list:
            # 解析证书列表
            parsed_certs = api.parse_cert_list(cert_list)
            if parsed_certs:
                print("解析后的证书列表:")
                print(json.dumps(parsed_certs, indent=2, ensure_ascii=False))
                
                # # 删除证书示例
                # cert_id = 4  # 要删除的证书ID
                # delete_result = api.delete_cert(cert_id)
                # if delete_result:
                #     print(f"证书 {cert_id} 删除成功")
                # else:
                #     print(f"证书 {cert_id} 删除失败")
        
        # 证书检查示例
        cert_file = "path/to/cert.pem"
        key_file = "path/to/key.pem"
        
        if os.path.exists(cert_file) and os.path.exists(key_file):
            return
            # 先检查证书
            cert_result = api.check_cert_from_files(cert_file, key_file)
            if cert_result:
                print("证书检查结果:")
                print(json.dumps(cert_result, indent=2, ensure_ascii=False))
                
                # 提交证书配置
                submit_result = api.submit_cert_config(cert_result)
                if submit_result:
                    print("证书配置提交成功:")
                    print(json.dumps(submit_result, indent=2, ensure_ascii=False))
                else:
                    print("证书配置提交失败")
    else:
        print("登录失败")

if __name__ == "__main__":
    main()

账号不要设置动态密码,如果设置了,那就创建一个新账号。

获取证书的脚本参考上一篇文章,对应的路径自己调整。更新证书的代码site_cert_auto_update_tool.py:

#!/usr/bin/env python3
import os
import subprocess
import hashlib
import json
from datetime import datetime
import logging
from baby_nanqiang_api_tools import NanQiangAPI

# Configuration
CERT_SOURCE_DIR = "/root/.acme.sh/h4ck.org.cn_ecc"
CERT_FILE = "fullchain.cer"
KEY_FILE = "h4ck.org.cn.key"
HASH_FILE = "web_cert_hash.json"
CERT_SCRIPT = "get_web_cert.sh"

def setup_logging():
    """设置日志"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler('web_cert_update.log'),
            logging.StreamHandler()
        ]
    )

def get_file_hash(file_path):
    """计算文件的SHA-256哈希值"""
    sha256_hash = hashlib.sha256()
    with open(file_path, "rb") as f:
        for byte_block in iter(lambda: f.read(4096), b""):
            sha256_hash.update(byte_block)
    return sha256_hash.hexdigest()

def save_cert_hash(cert_hash, key_hash):
    """保存证书和私钥的哈希值到JSON文件"""
    with open(HASH_FILE, 'w') as f:
        json.dump({
            'cert_hash': cert_hash,
            'key_hash': key_hash
        }, f)

def load_cert_hash():
    """从JSON文件加载证书和私钥的哈希值"""
    try:
        with open(HASH_FILE, 'r') as f:
            data = json.load(f)
            return data.get('cert_hash'), data.get('key_hash')
    except (FileNotFoundError, json.JSONDecodeError):
        return None, None

def run_get_cert_script(script_path=None):
    """
    执行获取证书的脚本
    :param script_path: 脚本路径,如果为None则使用默认的get_web_cert.sh
    :return: bool 是否执行成功
    """
    try:
        # 如果没有指定脚本路径,使用默认的get_web_cert.sh
        if script_path is None:
            script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), CERT_SCRIPT)
        
        # 检查脚本是否存在
        if not os.path.exists(script_path):
            logging.error(f"错误: 脚本文件 {script_path} 不存在")
            return False
            
        # 检查脚本是否可执行
        if not os.access(script_path, os.X_OK):
            logging.error(f"错误: 脚本文件 {script_path} 没有执行权限")
            return False
            
        # 执行脚本
        result = subprocess.run(['sh', script_path], 
                              capture_output=True, 
                              text=True)
        
        # 检查执行结果
        if result.returncode == 0:
            logging.info("证书获取脚本执行成功")
            if result.stdout:
                logging.info("脚本输出:\n%s", result.stdout)
            return True
        else:
            logging.error(f"证书获取脚本执行异常,返回码: {result.returncode}")
            if result.stderr:
                logging.error("异常输出:\n%s", result.stderr)
            return True
            
    except Exception as e:
        logging.error(f"执行证书获取脚本时发生错误: {str(e)}")
        return False

def read_file_content(file_path):
    """读取文件内容"""
    try:
        with open(file_path, 'r') as f:
            return f.read()
    except Exception as e:
        logging.error(f"读取文件 {file_path} 失败: {str(e)}")
        return None

def is_cert_expired(expire_time_str):
    """
    检查证书是否过期或即将过期(7天内)
    :param expire_time_str: 过期时间字符串
    :return: bool 是否过期或即将过期
    """
    try:
        expire_time = datetime.strptime(expire_time_str, "%Y-%m-%d %H:%M:%S")
        now = datetime.now()
        days_until_expire = (expire_time - now).days
        return days_until_expire <= 7
    except Exception as e:
        logging.error(f"解析过期时间失败: {str(e)}")
        return False

def process_same_sni_certs(api, parsed_certs, current_sni, current_cert_id):
    """
    处理具有相同SNI的证书
    :param api: API实例
    :param parsed_certs: 解析后的证书列表
    :param current_sni: 当前证书的SNI
    :param current_cert_id: 当前证书的ID
    :return: None
    """
    # 筛选出相同SNI的证书
    same_sni_certs = [cert for cert in parsed_certs 
                     if cert['sni'] == current_sni and cert['id'] != current_cert_id]
    
    if not same_sni_certs:
        return
        
    # 按过期时间排序(从早到晚)
    same_sni_certs.sort(key=lambda x: datetime.strptime(x['expire_time'], "%Y-%m-%d %H:%M:%S"))
    
    # 检查是否有过期或即将过期的证书
    for cert in same_sni_certs:
        if is_cert_expired(cert['expire_time']):
            logging.info(f"删除过期证书 ID: {cert['id']}")
            if not api.delete_cert(cert['id']):
                logging.error(f"删除证书 {cert['id']} 失败")
    
    # 检查是否有过期时间相同的证书
    if len(same_sni_certs) > 1:
        # 获取第一个证书的过期时间作为基准
        base_expire_time = same_sni_certs[0]['expire_time']
        
        # 删除过期时间相同的证书(保留第一个)
        for cert in same_sni_certs[1:]:
            if cert['expire_time'] == base_expire_time:
                logging.info(f"删除重复过期时间的证书 ID: {cert['id']}")
                if not api.delete_cert(cert['id']):
                    logging.error(f"删除证书 {cert['id']} 失败")

def main():
    # 设置日志
    setup_logging()
    
    try:
        # 执行证书获取脚本
        if not run_get_cert_script():
            logging.error("获取证书失败,退出程序")
            return
            
        # 检查证书文件是否存在
        cert_path = os.path.join(CERT_SOURCE_DIR, CERT_FILE)
        key_path = os.path.join(CERT_SOURCE_DIR, KEY_FILE)
        
        if not (os.path.exists(cert_path) and os.path.exists(key_path)):
            logging.error("证书文件不存在,退出程序")
            return
            
        # 计算新文件的哈希值
        new_cert_hash = get_file_hash(cert_path)
        new_key_hash = get_file_hash(key_path)
        
        # 获取旧的哈希值
        old_cert_hash, old_key_hash = load_cert_hash()
        
        # 检查文件是否发生变化
        if new_cert_hash != old_cert_hash or new_key_hash != old_key_hash:
            logging.info("证书文件已发生变化,开始更新流程")
            
            # 读取证书和私钥内容
            cert_content = read_file_content(cert_path)
            key_content = read_file_content(key_path)
            
            if not cert_content or not key_content:
                logging.error("读取证书文件失败")
                return
                
            # 初始化API
            api = NanQiangAPI()
            
            # 登录
            if not api.login("obaby", "obaby@mars"):
                logging.error("登录失败")
                return
                
            # 检查证书
            check_result = api.check_cert(cert_content, key_content)
            if not check_result:
                logging.error("证书检查失败")
                return
                
            # 提交证书配置
            if not api.submit_cert_config(check_result):
                logging.error("提交证书配置失败")
                return
                
            # 获取证书列表
            cert_list = api.get_cert_list()
            if not cert_list:
                logging.error("获取证书列表失败")
                return
                
            # 解析证书列表
            parsed_certs = api.parse_cert_list(cert_list)
            if not parsed_certs:
                logging.error("解析证书列表失败")
                return
                
            # 获取当前证书的SNI
            current_sni = check_result.get('sni', '[]')
            try:
                current_sni = json.loads(current_sni)
            except json.JSONDecodeError:
                logging.error("解析当前证书SNI失败")
                return
                
            # 处理相同SNI的证书
            process_same_sni_certs(api, parsed_certs, current_sni, check_result.get('id'))
            
            # 保存新的哈希值
            save_cert_hash(new_cert_hash, new_key_hash)
            logging.info("证书更新完成")
        else:
            logging.info("证书文件未发生变化,无需更新")
            
    except Exception as e:
        logging.error(f"程序执行出错: {str(e)}", exc_info=True)

if __name__ == "__main__":
    main()

添加定时任务,每天,或者每几天:

0 2 * * * /usr/bin/python3 /home/soft/baby-nanqiang-cert-tools/site_cert_auto_update_tool.py >> /home/soft/baby-nanqiang-cert-tools/web_cert_manager.log 2>&1

最终效果:

☆版权☆

* 网站名称:obaby@mars
* 网址:https://obaby.org.cn/
* 个性:https://oba.by/
* 本文标题: 《南墙 WAF 系列(二)– 网站证书自动更新》
* 本文链接:https://obaby.org.cn/2025/04/20086
* 短链接:https://oba.by/?p=20086
* 转载文章请标明文章来源,原文标题以及原文链接。请遵从 《署名-非商业性使用-相同方式共享 2.5 中国大陆 (CC BY-NC-SA 2.5 CN) 》许可协议。


You may also like

26 comments

  1.   Level 7
    Google Chrome 134 Google Chrome 134 Mac OS X 10.15 Mac OS X 10.15 cn中国–浙江–杭州 联通

    为什么要谢你,那算推广吗?我写了那么多,怎么没人来谢我,哼

    1. 公主 Queen 
      Google Chrome 134 Google Chrome 134 Mac OS X 10.15 Mac OS X 10.15 cn中国–山东–青岛 联通

      我这是推广加生态完善好吧。丰富他们的技术能力,嘻嘻

  2.  Level 3
    Google Chrome 86 Google Chrome 86 Windows 10 Windows 10 cn中国 中国移动

    自动申请及部署,能部署到cdn上就牛逼了!你这是企业级别的要是能部署到宝塔面板,用户量可能就更大了

    1. 公主 Queen 
      Google Chrome 134 Google Chrome 134 Android 10 Android 10 cn中国–山东–青岛 联通

      cdn有接口也可以 但是没必要 多数cdn都支持自动申请

      1. Level 5
        Google Chrome 132 Google Chrome 132 Android 10 Android 10 cn中国 中国电信

        华为云的CDN就不太行,不过华为云的API是真丰富,调用很方便。

        1. 公主 Queen 
          Google Chrome 134 Google Chrome 134 Android 10 Android 10 cn中国–山东–青岛 联通

          纯cdn不带防御 又是大厂的 直接不敢用
          随时被刷

  3. Level 2
    Google Chrome 129 Google Chrome 129 Windows 10 Windows 10 cn中国–陕西–西安 电信

    现在SSL周期越来越短了,CF自动SSL挺省心的。 对了我看雷池蛮不错的。

  4.  Level 6
    IBrowse r IBrowse r Android 12 Android 12 cn中国–河南–漯河 联通

    现在还有免费的一年期的吗?前段时间我在一个群里问过,有人说有,但是没下文。一直用的腾讯云申请的,手动太麻烦了。

  5. Level 2
    Google Chrome 133 Google Chrome 133 Windows 11 Windows 11 cn中国 中国移动

    每次看到“南墙”,都有一种不撞南墙不死心的想法 dance rofl现在免费的SSL证书时间真的越来越短了。卖的证书,加起来比我服务器贵了好几倍,想方设法赚我们的血汗钱,太南了!

    1. 公主 Queen 
      Google Chrome 132 Google Chrome 132 Windows 10 Windows 10 cn中国 中国联通

      一年的有效期真不错,现在找不到免费一年的了

  6.  Level 3
    Microsoft Edge 134 Microsoft Edge 134 Windows 11 Windows 11 cn中国–江苏–苏州 电信

    我在宝塔里设了更新SSL证书,不知道效果如何,等到期时候看看。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注