侧边栏壁纸
  • 累计撰写 54 篇文章
  • 累计创建 31 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

记录公众号通过自动回复比较cpu性能的整个过程

nankle
2024-10-03 / 0 评论 / 0 点赞 / 10 阅读 / 13696 字
温馨提示:
本文最后更新于 2024-10-03,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

本文仅限学习交流,学习如何开发程序,如文章中使用的信息侵犯了相关方权益,请联系作者删除,作者不对下面信息负有任何责任

数据获取

通过passMark获取数据,并存入mysql数据库中

https://www.cpubenchmark.net/CPU_mega_page.html

开发程序对关键词解析并返回

代码如下,在windows下运行

from selenium import webdriver
from selenium.webdriver.common.by import By
import mysql.connector
from mysql.connector import Error
import time

def cpu():
    # 初始化 WebDriver
    driver = webdriver.Chrome()
    
    try:
        # 访问目标 URL
        url = "https://www.cpubenchmark.net/CPU_mega_page.html"
        driver.get(url)
        
        # 等待页面加载完成(可选)
        driver.implicitly_wait(10)  # 等待 10 秒
        # 再暂停5秒是为了手动选择all获取全部
        print("开始暂停...")
        time.sleep(5)  # 暂停5秒
        print("暂停结束,继续执行程序。")

        # 定位表格元素
        table = driver.find_element(By.XPATH, "//table[@class='dataTable-blue dataTable no-footer']")
        
        # 建立数据库连接
        connection = mysql.connector.connect(
            host='mysqlIp',
            database='dbname',
            user='username',
            password='password'
        )
        
        if connection.is_connected():
            cursor = connection.cursor()
            print("Database connection established.")
            
            # 清空目标表
            truncate_table(cursor)

            # 提取表格数据
            rows = table.find_elements(By.TAG_NAME, "tr")
            inserted_count = 0
            
            for row in rows:
                cells = row.find_elements(By.TAG_NAME, "td")
                if len(cells) >= 7:
                    name = cells[1].text.strip()
                    cores = cells[2].text.strip()
                    mark = cells[3].text.strip()
                    tdp = cells[5].text.strip()
                    socket = cells[6].text.strip()
                    category = cells[7].text.strip()
                    
                    # 插入数据到数据库
                    insert_data(cursor, name, cores, mark, tdp, socket, category)
                    
                    inserted_count += 1
                    print(f"Inserted data: Name={name}, Cores={cores}, Mark={mark}, TDP={tdp}, Socket={socket}, Category={category}")
            
            # 判断是否需要交换表
            if inserted_count > get_cpus_count(cursor):
                swap_tables(cursor)
                print("Tables swapped successfully.")
            else:
                print("Data count is less than existing data; no update performed.")
            
            # 提交事务
            connection.commit()
    
    except Error as e:
        print("Error while connecting to MySQL", e)
    finally:
        # 关闭 WebDriver 和数据库连接
        driver.quit()
        if connection.is_connected():
            cursor.close()
            connection.close()
            print("MySQL connection is closed")

def truncate_table(cursor):
    """清空目标表"""
    query = "TRUNCATE TABLE cpus_tmp;"
    cursor.execute(query)
    print("table 'cpus_tmp' TRUNCATED.")

def insert_data(cursor, name, cores, mark, tdp, socket, category):
    """插入数据到cpus_tmp表"""
    print(f"start insert :  Name: {name}, Cores: {cores}, Mark: {mark}, TDP: {tdp}, Socket: {socket}, Category: {category}")
    query = """
    INSERT INTO cpus_tmp (name, cores, mark, tdp, socket, category)
    VALUES (%s, %s, %s, %s, %s, %s);
    """
    values = (name, cores, mark, tdp, socket, category)
    cursor.execute(query, values)

def get_cpus_count(cursor):
    """获取cpus表中的记录数"""
    query = "SELECT COUNT(*) FROM cpus;"
    cursor.execute(query)
    result = cursor.fetchone()
    return int(result[0]) if result and len(result) > 0 else 0

def swap_tables(cursor):
    """交换cpus_tmp和cpus表"""
    query = """
    RENAME TABLE cpus TO temp, cpus_tmp TO cpus, temp TO cpus_tmp;
    """
    cursor.execute(query)

# 调用函数
cpu()

以上代码执行后将数据存入数据库,以便后面的应用使用。

开发python的web应用,并放入docker运行

将 Flask 应用程序放入 Docker 容器中并通过 Nginx 代理访问,可以按照以下步骤进行:

步骤 1:准备 Flask 应用程序

确保你的 Flask 应用程序已经准备好,并且能够正常运行。这里以一个简单的 Flask 应用为例:

from flask import Flask, request, make_response
import mysql.connector
from mysql.connector import Error
import xml.etree.ElementTree as ET
import time
import hashlib

app = Flask(__name__)

# 配置 MySQL 数据库连接参数
db_config = {
    'host': 'localhost',
    'user': 'your_username',
    'password': 'your_password',
    'database': 'your_database'
}

# 微信 Token
WECHAT_TOKEN = 'your_token_here'

def verify_signature(signature, timestamp, nonce):
    """ 验证微信服务器发送的签名 """
    # 将 token、timestamp 和 nonce 按照字典序排序
    sorted_params = sorted([WECHAT_TOKEN, timestamp, nonce])
    # 拼接字符串
    concatenated_string = ''.join(sorted_params)
    # 使用 SHA1 计算哈希值
    hash_object = hashlib.sha1(concatenated_string.encode('utf-8'))
    calculated_signature = hash_object.hexdigest()
    # 比较计算得到的哈希值与 signature
    return calculated_signature == signature

def get_reply(keyword):
    """ 从数据库中查询关键词对应的回复信息 """
    try:
        connection = mysql.connector.connect(**db_config)
        if connection.is_connected():
            cursor = connection.cursor(dictionary=True)
            query = "SELECT reply FROM data WHERE keyword = %s"
            cursor.execute(query, (keyword,))
            result = cursor.fetchone()
            cursor.close()
            return result['reply'] if result else None
    except Error as e:
        print(f"Error connecting to MySQL Platform: {e}")
    finally:
        if connection.is_connected():
            connection.close()
    return "查询失败,请稍后重试!"

@app.route('/wechat', methods=['GET', 'POST'])
def wechat():
    if request.method == 'GET':
        # 验证请求
        signature = request.args.get('signature', '')
        timestamp = request.args.get('timestamp', '')
        nonce = request.args.get('nonce', '')
        echostr = request.args.get('echostr', '')

        # 验证 signature
        if verify_signature(signature, timestamp, nonce):
            # 如果验证通过,返回 echostr
            return make_response(echostr)
        else:
            # 如果验证不通过,返回错误信息
            return make_response("Invalid signature"), 403

    elif request.method == 'POST':
        # 解析 XML 格式的消息
        xml_data = request.data.decode('utf-8')
        root = ET.fromstring(xml_data)
        
        # 获取消息类型
        msg_type = root.find('MsgType').text
        
        # 检查消息类型是否为 text
        if msg_type == 'text':
            # 获取消息内容
            content = root.find('Content').text
            
            # 设置几个关键词及其对应的回复
            keywords = {
                '关键词1': '关键词1的回复',
                '关键词2': '关键词2的回复',
                '关键词3': '关键词3的回复'
            }
            
            # 检查消息内容中是否包含关键词
            for keyword, reply in keywords.items():
                if keyword in content:
                    # 构建回复的 XML 格式消息
                    response_xml = f"""
                    <xml>
                      <ToUserName><![CDATA[{root.find('FromUserName').text}]]></ToUserName>
                      <FromUserName><![CDATA[{root.find('ToUserName').text}]]></FromUserName>
                      <CreateTime>{int(time.time())}</CreateTime>
                      <MsgType><![CDATA[text]]></MsgType>
                      <Content><![CDATA[{reply}]]></Content>
                      <FuncFlag>0</FuncFlag>
                    </xml>
                    """
                    return make_response(response_xml)

            # 如果没有匹配的关键词,查询数据库
            reply = get_reply(content)
            if reply:
                # 构建回复的 XML 格式消息
                response_xml = f"""
                <xml>
                  <ToUserName><![CDATA[{root.find('FromUserName').text}]]></ToUserName>
                  <FromUserName><![CDATA[{root.find('ToUserName').text}]]></FromUserName>
                  <CreateTime>{int(time.time())}</CreateTime>
                  <MsgType><![CDATA[text]]></MsgType>
                  <Content><![CDATA[{reply}]]></Content>
                  <FuncFlag>0</FuncFlag>
                </xml>
                """
                return make_response(response_xml)

            # 如果数据库查询失败,返回默认回复
            default_reply = "查询失败,请稍后重试!"
            response_xml = f"""
            <xml>
              <ToUserName><![CDATA[{root.find('FromUserName').text}]]></ToUserName>
              <FromUserName><![CDATA[{root.find('ToUserName').text}]]></FromUserName>
              <CreateTime>{int(time.time())}</CreateTime>
              <MsgType><![CDATA[text]]></MsgType>
              <Content><![CDATA[{default_reply}]]></Content>
              <FuncFlag>0</FuncFlag>
            </xml>
            """
            return make_response(response_xml)

        # 如果不是 text 类型的消息,返回默认响应
        default_response = f"""
        <xml>
          <ToUserName><![CDATA[{root.find('FromUserName').text}]]></ToUserName>
          <FromUserName><![CDATA[{root.find('ToUserName').text}]]></FromUserName>
          <CreateTime>{int(time.time())}</CreateTime>
          <MsgType><![CDATA[text]]></MsgType>
          <Content><![CDATA[仅支持文本消息]]></Content>
          <FuncFlag>0</FuncFlag>
        </xml>
        """
        return make_response(default_response)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=False)

进行本地测试:

curl -X GET      "http://localhost:5000/wechat?signature=9b1fc0530a7d30b8ae3004aea631b59c5c1134d2&timestamp=123456789&nonce=987654321&echostr=test_echo_str22"

可以正常返回test_echo_str22,这个要注意的是signature必须是计算好的。

curl -X POST      -H "Content-Type: application/xml"      -d '<xml><ToUserName><![CDATA[toUser]]></ToUserName><FromUserName><![CDATA[fromUser]]></FromUserName><CreateTime>123456789</CreateTime><MsgType><![CDATA[text]]></MsgType><Content><![CDATA[59]]></Content><MsgId>1234567890123456</MsgId></xml>'      http://localhost:5000/wechat

返回正常:

                <xml>

                  <ToUserName><![CDATA[fromUser]]></ToUserName>

                  <FromUserName><![CDATA[toUser]]></FromUserName>

                  <CreateTime>1726403271</CreateTime>

                  <MsgType><![CDATA[text]]></MsgType>

                  <Content><![CDATA[[5-Way CPU] AMD Ryzen 9 5950X]]></Content>

                  <FuncFlag>0</FuncFlag>

                </xml>

步骤 2:创建 Dockerfile

在 Flask 应用所在的目录下创建一个 Dockerfile 文件,用于构建 Docker 镜像:

# 使用官方 Python 镜像作为基础镜像
FROM python:3.8-slim

# 设置工作目录
WORKDIR /app

# 将应用代码复制到容器中
COPY . .

# 安装 Flask 和其他依赖
RUN pip install --no-cache-dir flask

# 暴露端口
EXPOSE 5000

# 运行 Flask 应用
CMD ["python", "app.py"]

步骤 3:构建 Docker 镜像

在包含 Dockerfile 的目录下运行以下命令来构建 Docker 镜像:

docker build -t your_flask_app .

其中 your_flask_app 是你为 Docker 镜像指定的标签。

步骤 4:运行 Docker 容器

运行 Docker 容器,并将容器端口映射到宿主机端口:

docker run -d -p 5000:5000 your_flask_app

这将启动一个 Docker 容器,并将容器内部的 5000 端口映射到宿主机的 5000 端口。

步骤 5:配置 Nginx 作为反向代理

假设你的 Nginx 服务器已经在运行,并且监听在 80 端口。你需要在 Nginx 的配置文件中添加一条新的 location 规则,将请求转发到 Flask 应用所在的端口(在这个例子中是 5000 端口)。编辑 Nginx 配置文件(通常是 /etc/nginx/nginx.conf 或 /etc/nginx/sites-enabled/default):

server {
    listen 80;
    server_name example.com;

    location /wechat {
        proxy_pass http://localhost:5000/wechat;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }
}


步骤 6:重新加载 Nginx 配置

重新加载 Nginx 配置,使新的 location 规则生效:

sudo systemctl reload nginx
或者,如果你使用的是 Nginx 的非系统服务管理方式:

sudo nginx -s reload


总结

通过以上步骤,将 Flask 应用程序放入 Docker 容器中,并通过 Nginx 作为反向代理来访问该应用。现在,所有的针对 /wechat 路径的请求都将被转发到运行在 Docker 容器中的 Flask 应用。

Flask 应用作为web服务,接收微信请求并从数据库中查询数据然后返回。

0

评论区