
在大促期间,运营人员需要实时监控核心商品的库存余量以及订单增量,防止超卖或库存积压。本文将手把手教你使用Python构建一套零成本的自动化监控与告警系统。该系统支持多商品监控,一旦库存低于阈值或订单量异常,立即通过微信推送告警。
你需要准备一台联网的服务器或本地电脑,并安装Python 3.8及以上版本。请勿使用旧版Python,以免依赖库兼容性问题。打开终端(Terminal或CMD),执行以下命令安装核心依赖库:
pip install requests schedule
这里我们使用requests库进行接口请求,使用schedule库进行定时任务调度。这两个库轻量且稳定,非常适合此类脚本。
为了方便运营人员修改监控参数,我们将所有配置项提取出来,不硬编码在脚本中。请在项目目录下创建一个名为monitor_config.json的文件,并复制以下完整内容:
配置参数详解:
https://sct.ftqq.com/免费申请一个Key填入此处,用于接收微信消息。stock_min表示库存低于此数值触发告警,order_max表示单分钟订单量高于此数值触发异常告警(防刷单或流量激增)。sku_id替换为你们真实的商品ID,api_url替换为你们后端提供的实时库存查询接口地址。在同级目录下创建主脚本promo_monitor.py。这个脚本将包含数据获取、逻辑判断和消息推送三个核心模块。
这是运营人员接收信息的唯一入口,必须保证高可用。代码如下:
```python def send_wechat_alert(title, content): push_key = config.get('wechat_push_key') if not push_key or push_key == "YOUR_SCKEY_HERE": print("错误:未配置微信Push Key,无法发送消息。") return url = f"https://sctapi.ftqq.com/{push_key}.send" payload = { "title": title, "desp": content } try: response = requests.post(url, data=payload, timeout=5) if response.json().get('code') != 0: print(f"消息推送失败:{response.text}") except Exception as e: print(f"消息推送异常:{str(e)}") ```实际场景中,你需要调用后端API。为了确保你能直接运行测试,我在代码中内置了一个模拟模式。如果你有真实接口,请替换fetch_real_data中的逻辑。
这是主逻辑,负责遍历配置文件中的商品,进行阈值比对。
```python def check_inventory(): print(f"开始执行监控任务:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") targets = config.get('targets', []) threshold = config.get('alert_threshold', {}) stock_min = threshold.get('stock_min', 0) order_max = threshold.get('order_max', 99999) for item in targets: sku_id = item['sku_id'] name = item['product_name'] api_url = item['api_url'] stock, orders = fetch_real_data(sku_id, api_url) if stock is None: continue 库存过低告警 if stock < stock_min: msg = f"商品:{name}\nSKU:{sku_id}\n当前库存:{stock}\n状态:低于警戒线 {stock_min}!请及时补货。" print(f"触发告警:{msg}") send_wechat_alert("【库存紧急预警】", msg) 订单量异常告警(防刷或爆单) elif orders > order_max: msg = f"商品:{name}\nSKU:{sku_id}\n近1分钟订单:{orders}\n状态:流量异常!请关注系统负载。" print(f"触发告警:{msg}") send_wechat_alert("【订单异常预警】", msg) else: print(f"监控正常:{name} | 库存: {stock} | 订单: {orders}") ```
很多运营同学卡在不知道如何获取sku_id和接口地址。这里提供一个通用的抓包方法,无需任何专业抓包工具,直接使用Chrome浏览器即可。
步骤如下:
Network选项卡。XHR或Fetch,这通常用于过滤出数据接口请求。stock、detail、price或sku关键字的请求。Headers中的Request URL(这就是你的api_url),查看Payload或Query String Parameters中通常能找到skuId或goods_id。脚本写好后,不能一直挂着命令行窗口。我们需要让它在后台稳定运行。以下是Linux服务器和Windows本机的部署方案。
在Linux环境下,我们使用nohup命令让脚本在后台运行,并将日志输出到文件,方便排查问题。
执行以下命令:
nohup python3 -u promo_monitor.py > monitor.log 2>&1 &
这条命令的含义是:使用Python3运行脚本,将标准输出和错误输出都重定向到monitor.log文件中,&表示在后台运行。
查看实时日志的命令:
tail -f monitor.log
如果需要停止监控,先查找进程ID:

ps -ef | grep promo_monitor.py
然后使用kill命令结束进程:
kill -9 [进程ID]
在Windows上,你可以使用“任务计划程序”来定时启动脚本,或者简单地双击运行,但为了防止误关窗口,建议使用start命令最小化运行。
创建一个run.bat文件,内容如下:
双击这个bat文件,脚本会在一个最小化的CMD窗口中运行。关闭窗口即停止脚本。
在实操过程中,可能会遇到网络波动导致的接口超时。本脚本已在fetch_real_data函数中加入了try-except捕获,防止程序因单次请求失败而崩溃。如果发现monitor.log中频繁出现超时错误,建议在requests.get()中增加timeout参数,或者检查服务器的出网带宽。确保系统时间准确,否则schedule库的定时任务可能会出现偏差。












易频IT社区是综合性互联网IT技术门户网站,专注分享网络技术、服务器运维、网络安全、编程开发、系统架构、云计算、大数据等行业干货,实时更新IT行业资讯、零基础教程、实战案例,为IT从业者、技术爱好者提供专业的学习交流平台。
Copyright © 2021-2026 易频IT社区. All Rights Reserved. 备案号:闽ICP备2023013482号 网站地图