上传文件至 红外
This commit is contained in:
206
红外/irre(1).py
Normal file
206
红外/irre(1).py
Normal file
@@ -0,0 +1,206 @@
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import struct
|
||||
from datetime import datetime
|
||||
|
||||
# 常量定义
|
||||
STEPS = [
|
||||
{"desc": "1. [基础] 按电源键开机", "cmd": "power_on"},
|
||||
{"desc": "2. [基础] 按电源键关机", "cmd": "power_off"},
|
||||
*[{"desc": f"3.{i+1} [制冷] 设置{i+16}℃", "cmd": f"cool_{i+16}"} for i in range(5)],#基础温度为16
|
||||
*[{"desc": f"4.{i+1} [制热] 设置{i+16}℃", "cmd": f"heat_{i+16}"} for i in range(5)],
|
||||
{"desc": "5.1 [模式] 制冷模式", "cmd": "mode_cool"},
|
||||
{"desc": "5.2 [模式] 除湿模式", "cmd": "mode_dry"},
|
||||
{"desc": "5.3 [模式] 送风模式", "cmd": "mode_fan"},
|
||||
{"desc": "5.4 [模式] 上下风", "cmd": "up_down"},#上下风
|
||||
{"desc": "6.1 [风速] 左右风", "cmd": "left_right"},#左右风
|
||||
{"desc": "6.2 [风速] 低速", "cmd": "fan_low"},
|
||||
{"desc": "6.3 [风速] 中速", "cmd": "fan_mid"},
|
||||
{"desc": "6.4 [风速] 高速", "cmd": "fan_high"},
|
||||
{"desc": "7.1 [特殊] 超强风速", "cmd": "turbo_on"},#超强风速
|
||||
{"desc": "7.2 [特殊] 静音风速", "cmd": "turbo_off"},#静音风速
|
||||
{"desc": "7.5 [特殊] 睡眠模式开", "cmd": "sleep_on"},
|
||||
{"desc": "7.6 [特殊] 睡眠模式关", "cmd": "sleep_off"},
|
||||
|
||||
|
||||
|
||||
|
||||
]
|
||||
|
||||
def validate_signal(data):
|
||||
"""增强版信号验证"""
|
||||
pattern = r"""
|
||||
uint16_t\s+rawData\[(\d+)\]\s*=\s* # 匹配数组声明
|
||||
{([\d,\s]+)};?[\s\S]*? # 捕获数组内容
|
||||
Protocol\s*=\s*(\w+) # 协议类型
|
||||
(?:.*?(\d+)\s+bits)? # 可选位数匹配
|
||||
"""
|
||||
match = re.search(pattern, data, re.VERBOSE | re.IGNORECASE)
|
||||
|
||||
if not match:
|
||||
return False, None, None, None
|
||||
|
||||
try:
|
||||
arr_length = int(match.group(1))
|
||||
raw_data = list(map(int, match.group(2).split(',')))
|
||||
protocol = match.group(3).upper()
|
||||
bits = int(match.group(4)) if match.group(4) else len(raw_data)*16
|
||||
|
||||
# 数据长度校验
|
||||
if len(raw_data) != arr_length:
|
||||
print(f"数据长度不匹配: 声明{arr_length} 实际{len(raw_data)}")
|
||||
return False, None, None, None
|
||||
|
||||
return raw_data, protocol, bits, len(data)
|
||||
except Exception as e:
|
||||
print(f"解析错误: {str(e)}")
|
||||
return False, None, None, None
|
||||
|
||||
def analyze_signals(collected_data):
|
||||
"""执行控制变量分析"""
|
||||
analysis = {"protocols": {}}
|
||||
|
||||
# 按协议分组
|
||||
protocol_groups = {}
|
||||
for cmd, data in collected_data.items():
|
||||
proto = data['protocol']
|
||||
protocol_groups.setdefault(proto, []).append(data)
|
||||
|
||||
# 分析每个协议
|
||||
for proto, group in protocol_groups.items():
|
||||
proto_analysis = {
|
||||
"sample_count": len(group),
|
||||
"fixed_bits": [],
|
||||
"variable_bits": [],
|
||||
"template": []
|
||||
}
|
||||
|
||||
# 位级分析
|
||||
max_length = max(len(d["raw"]) for d in group)
|
||||
bit_analysis = [[] for _ in range(max_length)]
|
||||
|
||||
for data in group:
|
||||
for i, val in enumerate(data["raw"]):
|
||||
bit_analysis[i].append(val)
|
||||
|
||||
# 识别固定/可变位
|
||||
for idx, values in enumerate(bit_analysis):
|
||||
unique_vals = set(values)
|
||||
if len(unique_vals) == 1:
|
||||
proto_analysis["fixed_bits"].append({
|
||||
"position": idx,
|
||||
"value": values[0]
|
||||
})
|
||||
else:
|
||||
proto_analysis["variable_bits"].append({
|
||||
"position": idx,
|
||||
"values": list(unique_vals)
|
||||
})
|
||||
|
||||
# 生成协议模板
|
||||
proto_analysis["template"] = [
|
||||
"V" if any(idx == b["position"] for b in proto_analysis["variable_bits"])
|
||||
else group[0]["raw"][idx]
|
||||
for idx in range(len(group[0]["raw"]))
|
||||
]
|
||||
|
||||
analysis["protocols"][proto] = proto_analysis
|
||||
|
||||
return analysis
|
||||
|
||||
def collect_signals():
|
||||
"""主采集流程"""
|
||||
collected = {}
|
||||
print("\n空调红外信号采集系统启动")
|
||||
|
||||
for step_num, step in enumerate(STEPS, 1):
|
||||
while True:
|
||||
print(f"\n[{step_num}/{len(STEPS)}] {step['desc']}")
|
||||
print("请输入红外信号数据(输入'exit'退出):")
|
||||
data = input().strip()
|
||||
|
||||
if data.lower() == 'exit':
|
||||
if input("确认退出?(y/n): ").lower() == 'y':
|
||||
save_data(collected)
|
||||
return
|
||||
|
||||
valid, raw, proto, bits = validate_signal(data)
|
||||
if valid:
|
||||
collected[step['cmd']] = {
|
||||
"raw": raw,
|
||||
"protocol": proto,
|
||||
"bits": bits,
|
||||
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
}
|
||||
print(f"√ 已记录:{step['cmd']}")
|
||||
break
|
||||
print("× 格式错误!需包含:\n1.完整C数组 2.协议信息 3.正确数据格式")
|
||||
|
||||
analysis = analyze_signals(collected)
|
||||
save_data(collected, analysis)
|
||||
print(f"\n采集完成!有效数据:{len(collected)}条")
|
||||
|
||||
def convert_to_bin(json_path):
|
||||
"""将JSON配置转换为二进制格式"""
|
||||
with open(json_path, 'r') as f:
|
||||
config = json.load(f)
|
||||
|
||||
bin_data = bytearray()
|
||||
for cmd in config['raw_data'].values():
|
||||
# 协议类型(4字节) + 位数(4字节) + 时间戳(12字节)
|
||||
bin_data.extend(struct.pack('<I', cmd['protocol']))
|
||||
bin_data.extend(struct.pack('<I', cmd['bits']))
|
||||
|
||||
# 时间编码调整(年-2000压缩到1字节,其他字段范围校验)
|
||||
dt = datetime.strptime(cmd['timestamp'], "%Y-%m-%d %H:%M:%S")
|
||||
year_byte = dt.year - 2000
|
||||
if not (0 <= year_byte <= 255):
|
||||
year_byte = max(0, min(year_byte, 255))
|
||||
time_bytes = struct.pack('<6B',
|
||||
year_byte,
|
||||
max(1, min(dt.month, 12)),
|
||||
max(1, min(dt.day, 31)),
|
||||
dt.hour,
|
||||
dt.minute,
|
||||
dt.second
|
||||
)
|
||||
bin_data.extend(time_bytes)
|
||||
|
||||
bin_path = os.path.splitext(json_path)[0] + '.bin'
|
||||
with open(bin_path, 'wb') as f:
|
||||
f.write(bin_data)
|
||||
print(f"二进制配置已生成: {os.path.basename(bin_path)}")
|
||||
|
||||
|
||||
def save_data(data, analysis=None):
|
||||
"""保存数据到文件"""
|
||||
config = {
|
||||
"metadata": {
|
||||
"create_time": datetime.now().isoformat(),
|
||||
"total_commands": len(data)
|
||||
},
|
||||
"raw_data": data,
|
||||
"analysis": analysis if analysis else {}
|
||||
}
|
||||
|
||||
file_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "ir_config.json")
|
||||
with open(file_path, "w") as f:
|
||||
json.dump(config, f, indent=2)
|
||||
print("配置已保存至 ir_config.json")
|
||||
convert_to_bin(file_path)
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("="*40)
|
||||
print("红外信号采集分析系统 v2.0")
|
||||
print("操作说明:")
|
||||
print("1. 保持设备连接")
|
||||
print("2. 按提示操作遥控器")
|
||||
print("3. 从串口监视器复制数据")
|
||||
print("="*40)
|
||||
|
||||
try:
|
||||
collect_signals()
|
||||
except KeyboardInterrupt:
|
||||
print("\n操作中断!正在保存数据...")
|
||||
save_data(collected if 'collected' in locals() else {})
|
||||
Reference in New Issue
Block a user