import pandas as pd import numpy as np from scipy import signal # === 1. 字段定义 === # 空地信号:12通道 (左4, 前4, 右4) GEARWOW_COLS = ( [f'GEARWOWL_{i}' for i in range(1, 5)] + [f'GEARWOWN_{i}' for i in range(1, 5)] + [f'GEARWOWR_{i}' for i in range(1, 5)] ) # 横向加速度:16通道 (16Hz) ACCLAT_COLS = [f'ACCLAT_{i}' for i in range(1, 17)] # 滚转角:4通道 (4Hz) - 假设数据源已按4Hz对齐或插值,此处定义为4列 ROLL_COLS = [f'ROLL_{i}' for i in range(1, 5)] # === 2. 数据加载与清洗 === df = pd.DataFrame() all_cols = GEARWOW_COLS + ACCLAT_COLS + ROLL_COLS # 模拟获取数据 (实际环境中 getList 函数由平台提供) for col in all_cols: df[col] = getList(col) # 清洗无效值 df = df.replace([None, '', 'NA', 'null', 'NaN'], pd.NA) # 数值型转换 df[ACCLAT_COLS + ROLL_COLS] = df[ACCLAT_COLS + ROLL_COLS].apply(pd.to_numeric, errors='coerce') # 空地状态标准化 (假设原始数据可能是 'AIR'/'GND' 或 0/1,统一转为布尔值:True=Ground) # 这里假设原始数据是字符串 'GND'/'AIR',如果是数字 1/0 请调整逻辑 for gw_col in GEARWOW_COLS: df[gw_col] = df[gw_col].astype(str).str.strip().str.upper() # 只要有一个轮子接地,即为 Ground 状态 df['IS_GROUND'] = df[GEARWOW_COLS].eq('GND').any(axis=1) # 后向填充防止瞬时丢点影响状态判断 df = df.fillna(method='bfill') # === 3. 设置参数 === FREQ_ACCLAT = 16.0 # 横向加速度频率 FREQ_ROLL = 4.0 # 滚转角频率 WINDOW_SEC = 5.0 # 监控窗口:落地后5秒 # 阈值定义 (基于历史最小故障案例:0.228g, 1.4deg) # 设定略低于历史极值以覆盖边界情况 THRESH_LAT_HIGH = 0.25 # 高风险阈值 THRESH_LAT_LOW = 0.20 # 低风险阈值 (覆盖 0.228) THRESH_ROLL_HIGH = 2.0 # 高风险阈值 THRESH_ROLL_LOW = 1.3 # 低风险阈值 (覆盖 1.4) # === 4. 核心逻辑:检测 Air -> Ground 转换点 === # 计算前一行的地面状态 df['PREV_IS_GROUND'] = df['IS_GROUND'].shift(1).fillna(False) # 找到从 False (Air) 变为 True (Ground) 的索引位置 transition_indices = df.index[(~df['PREV_IS_GROUND']) & (df['IS_GROUND'])].tolist() max_status = 0 # 默认无风险 # === 5. 遍历每个着陆事件进行窗口分析 === for idx in transition_indices: # 确定窗口范围 [当前行, 当前行 + 5秒] # 由于不同参数频率不同,我们按行数计算 # ACCLAT 是 16Hz, 5秒 = 80行 # ROLL 是 4Hz, 5秒 = 20行 (但在DataFrame中通常行号是同步的时间轴,假设DataFrame是最高频16Hz对齐的) # 如果DataFrame是16Hz基准,ROLL列会有重复值或插值,直接切片即可 start_row = idx end_row = idx + int(WINDOW_SEC * FREQ_ACCLAT) # 按16Hz计算行数偏移 if end_row > len(df): end_row = len(df) window_df = df.iloc[start_row:end_row] if window_df.empty: continue # --- 计算窗口内最大绝对横向加速度 (16通道取最大) --- # 展平窗口内所有ACCLAT列的数据,取绝对值最大值 lat_values = window_df[ACCLAT_COLS].values.flatten() lat_values = lat_values[~np.isnan(lat_values)] # 去除NaN if len(lat_values) == 0: max_abs_lat = 0 else: max_abs_lat = np.max(np.abs(lat_values)) # --- 计算窗口内最大绝对滚转角 (4通道取最大) --- # 注意:如果DF是16Hz,ROLL列可能每4行重复一次,或者包含NaN,需清洗 roll_values = window_df[ROLL_COLS].values.flatten() roll_values = roll_values[~np.isnan(roll_values)] if len(roll_values) == 0: max_abs_roll = 0 else: max_abs_roll = np.max(np.abs(roll_values)) # --- 判决逻辑 --- current_status = 0 # 必须同时满足横向和滚转条件 (AND逻辑) if max_abs_lat >= THRESH_LAT_LOW and max_abs_roll >= THRESH_ROLL_LOW: if max_abs_lat >= THRESH_LAT_HIGH and max_abs_roll >= THRESH_ROLL_HIGH: current_status = 10 # 高风险:类似大角度侧风或严重错位前兆 else: current_status = 5 # 中风险:覆盖历史最小故障案例 (0.228g, 1.4deg) # 更新全局最高状态 if current_status > max_status: max_status = current_status # === 6. 输出结果 === # 最终输出:仅一个整数 (0=正常, 5=关注, 10=警告) print(int(max_status))