極端なデータ希薄シナリオにおける衛星異常応答運用のための教師なし(自己教師あり)時間的パターン発掘
導入:サイレント衛星問題
欧州宇宙機関(ESA)のデータラボでの調査中に、私たちが「サイレント衛星問題」と呼んでいたものに初めて遭遇したときのことを覚えています。私たちは、10年以上前から運用されている地球観測衛星からのテレメトリを分析していましたが、そこでは原因不明の電力変動が現れ始めていました。異常ログはまばらで、意味のある出来事の間隔が数週間ではなく数週間単位で空くこともあり、ラベル付きデータは実質的に存在しませんでした。従来の教師ありアプローチは見事に失敗し、私たちの最良のモデルでも異常検知の精度はせいぜい40%程度でした。
この経験は、宇宙システム向けのAIへの私の考え方を根本から変えました。別の手法を探る中で、最も価値のあるパターンは異常そのものではなく、「一見正常に見える運用」の時間的な関係性の中にあるのだと気づきました。自己教師あり学習の最近の進歩を調べるうちに、極端なデータ希薄性があっても、衛星の全運用履歴を単一の連続した学習シグナルとして扱えるのではないかと分かったのです。
技術的背景:希薄な宇宙データがもたらす課題
衛星の異常応答は、従来の機械学習アプローチでは不十分になってしまう独自の課題を抱えています:
- 極端なデータ希薄性:異常が数か月に1回程度しか起きないことがある
- 高次元の時間データ:複雑に依存し合う何百ものテレメトリチャネル
- 欠落したラベル:ほとんどの異常は、事後に(あるいはそもそも)発見される
- 非定常システム:衛星は経年劣化し、構成要素は劣化し、運用パターンは変化する
時系列に対する自己教師ありアプローチを調査していたとき、コントラスト学習の手法は、陽性例が疎でも頑健な表現を学習できることを見いだしました。鍵となる洞察は、時間的整合性――システムが時間の経過とともにどう振る舞うか――が、明示的なラベルなしで強力な教師信号を提供できるという点でした。
中核となる手法:時間的コントラスト学習
時間的整合性の原則
衛星テレメトリを用いた実験から得られた興味深い結果の一つは、正常な運用は予測可能な時間的パターンを示す一方で、異常はそのパターンを微妙な形で崩していくということでした。これを時間的コントラスト学習によって活用できます:
import torch
import torch.nn as nn
import torch.nn.functional as F
class TemporalContrastiveEncoder(nn.Module):
def __init__(self, input_dim=128, hidden_dim=256, output_dim=128):
super().__init__()
self.temporal_encoder = nn.LSTM(
input_dim, hidden_dim,
num_layers=3,
bidirectional=True,
dropout=0.2,
batch_first=True
)
self.projection_head = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, output_dim)
)
def forward(self, x, lengths):
packed = nn.utils.rnn.pack_padded_sequence(
x, lengths.cpu(),
batch_first=True,
enforce_sorted=False
)
outputs, (hidden, cell) = self.temporal_encoder(packed)
hidden = torch.cat([hidden[-2], hidden[-1]], dim=1)
return F.normalize(self.projection_head(hidden), dim=1)
class TemporalContrastiveLoss(nn.Module):
def __init__(self, temperature=0.1):
super().__init__()
self.temperature = temperature
def forward(self, z_i, z_j):
"""正規化された温度スケーリング付きクロスエントロピー損失"""
batch_size = z_i.size(0)
labels = torch.arange(batch_size).to(z_i.device)
# 類似度行列を計算する
features = torch.cat([z_i, z_j], dim=0)
similarity_matrix = F.cosine_similarity(
features.unsqueeze(1),
features.unsqueeze(0),
dim=2
)
# 正のペアのためのマスク
mask = torch.eye(batch_size, dtype=torch.bool).to(z_i.device)
mask = mask.repeat(2, 2)
mask.fill_diagonal_(0)
positives = similarity_matrix[mask].view(2 * batch_size, -1)
negatives = similarity_matrix[~mask].view(2 * batch_size, -1)
logits = torch.cat([positives, negatives], dim=1)
logits /= self.temperature
loss = F.cross_entropy(logits, labels.repeat(2))
return loss
このアプローチを研究することで、時間的な拡張(シフト、スケーリング、時間窓のマスキング)によって正のペアを作成すると、モデルが通常の運用パターンの不変な表現を学習できることを理解しました。
Multi-Scale Temporal Pattern Mining
衛星テレメトリを調査した結果、異常はさまざまな時間スケールで現れることが分かりました。突然のスパイクとして現れるものもあれば、数週間かけて徐々に発展するものもあります。そこで、私は次のようなマルチスケールのアーキテクチャを開発しました:
class MultiScaleTemporalMiner(nn.Module):
def __init__(self, input_channels, scales=[1, 7, 30]):
super().__init__()
self.scales = scales
self.encoders = nn.ModuleList([
TemporalEncoder(input_channels, scale_length=s)
for s in scales
])
返却形式: {"translated": "翻訳されたHTML"}self.attention = nn.MultiheadAttention(
embed_dim=256,
num_heads=8,
dropout=0.1,
batch_first=True
)
self.anomaly_scorer = nn.Sequential(
nn.Linear(256 * len(scales), 512),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(512, 256),
nn.ReLU(),
nn.Linear(256, 1)
)
def forward(self, x):
"""マルチスケールの時系列パターンを処理する"""
scale_features = []
for scale, encoder in zip(self.scales, self.encoders):
# 各時間スケールで特徴を抽出する
scaled_x = self._resample_temporal(x, scale)
features = encoder(scaled_x)
scale_features.append(features)
# スケール間の注意機構
combined = torch.cat(scale_features, dim=1)
attended, _ = self.attention(combined, combined, combined)
# 異常スコアリング
scores = self.anomaly_scorer(attended)
return scores, attended
def _resample_temporal(self, x, scale_factor):
"""適応的な時間方向の再サンプリング"""
if scale_factor == 1:
return x
batch_size, seq_len, features = x.shape
new_len = seq_len // scale_factor
# 時間方向の再サンプリングには適応プーリングを使用する
x_reshaped = x.transpose(1, 2).contiguous()
resampled = F.adaptive_avg_pool1d(x_reshaped, new_len)
return resampled.transpose(1, 2).contiguous()
このアーキテクチャを試作している中で、注意機構が異常の種類に応じて異なる時間スケールに重み付けを学習することを観察しました。これにより、失敗モードに関する解釈可能な洞察が得られます。
実装の詳細:極端なスパース性への対応
時系列のための合成少数クラスオーバーサンプリング
私が直面した最大の課題の1つは、クラスの極端な不均衡でした。解決策を調査する中で、時間的な情報を考慮したオーバーサンプリング手法を開発しました:
import numpy as np
from scipy.interpolate import CubicSpline
返却形式: {"translated": "翻訳されたHTML"}class TemporalSMOTE:
def __init__(self, k_neighbors=5, synthetic_ratio=0.5):
self.k = k_neighbors
self.ratio = synthetic_ratio
def generate_synthetic(self, minority_samples, timestamps):
"""生成する合成の時系列シーケンス"""
n_samples, seq_len, n_features = minority_samples.shape
n_synthetic = int(n_samples*self.ratio)
synthetic_samples = []
synthetic_timestamps = []
for _ in range(n_synthetic):
# 2つのサンプルをランダムに選択
idx1, idx2 = np.random.choice(n_samples, 2, replace=False)
# DTW距離を用いた時間方向のアラインメント
aligned_sample = self._temporal_interpolation(
minority_samples[idx1],
minority_samples[idx2],
timestamps[idx1],
timestamps[idx2]
)
synthetic_samples.append(aligned_sample)
# 合成タイムスタンプの生成
synthetic_ts = self._interpolate_timestamp(
timestamps[idx1],
timestamps[idx2]
)
synthetic_timestamps.append(synthetic_ts)
return np.array(synthetic_samples), np.array(synthetic_timestamps)
def _temporal_interpolation(self, seq1, seq2, ts1, ts2):
"""動的時間伸縮(DTW)による補間"""
# DTWベースの補間を簡略化
n_features = seq1.shape[1]
interpolated = np.zeros_like(seq1)
for f in range(n_features):
# 両方のシーケンスに対して三次スプラインを作成
cs1 = CubicSpline(ts1, seq1[:, f])
cs2 = CubicSpline(ts2, seq2[:, f])
# 組み合わせた時点で補間する
combined_ts = np.sort(np.unique(np.concatenate([ts1, ts2])))
interp1 = cs1(combined_ts)
interp2 = cs2(combined_ts)
# 重み付き結合
alpha = np.random.random()
interpolated[:, f] = alpha * interp1 + (1 - alpha) * interp2
return interpolated
さまざまなオーバーサンプリング手法を調べることで、時系列に配慮した手法が、時系列異常検知における従来のSMOTEを大きく上回ることが分かりました。
不確実性を考慮した異常スコアリング
私の研究では、不確実性を定量化することが運用上の意思決定にとって重要であることが明らかになりました。私はベイズ的アプローチを実装しました:
import pyro
import pyro.distributions as dist
from pyro.infer import SVI, Trace_ELBO
from pyro.optim import Adam
class BayesianAnomalyDetector(nn.Module):
def __init__(self, input_dim, hidden_dim=128):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim // 2)
)
def model(self, x, y=None):
# 優先分布
w_prior = dist.Normal(torch.zeros_like(self.encoder[0].weight), 1.)
b_prior = dist.Normal(torch.zeros_like(self.encoder[0].bias), 1.)
priors = {'encoder.0.weight': w_prior, 'encoder.0.bias': b_prior}
# パラメータを持ち上げる
lifted_module = pyro.random_module("module", self.encoder, priors)
lifted_reg_model = lifted_module()
with pyro.plate("data", x.shape[0]):
# 順伝播
hidden = lifted_reg_model(x)
mean = hidden.mean(1, keepdim=True)
# 観測モデル
obs = dist.Normal(mean, 0.1)
pyro.sample("obs", obs, obs=y)
def guide(self, x, y=None):
# 変分パラメータ
w_mu = pyro.param("w_mu", torch.randn_like(self.encoder[0].weight))
w_sigma = pyro.param("w_sigma", torch.ones_like(self.encoder[0].weight),
constraint=dist.constraints.positive)
b_mu = pyro.param("b_mu", torch.randn_like(self.encoder[0].bias))
b_sigma = pyro.param("b_sigma", torch.ones_like(self.encoder[0].bias),
constraint=dist.constraints.positive)
# 変分分布
w_dist = dist.Normal(w_mu, w_sigma)
b_dist = dist.Normal(b_mu, b_sigma)
variational = {'encoder.0.weight': w_dist, 'encoder.0.bias': b_dist}
lifted_module = pyro.random_module("module", self.encoder, variational)
return lifted_module()
def predict_with_uncertainty(self, x, n_samples=100):
"""不確実性推定のためのモンテカルロ・ドロップアウト"""
predictions = []
for _ in range(n_samples):
with torch.no_grad():
pred = self.encoder(x)
predictions.append(pred)
predictions = torch.stack(predictions)
mean_pred = predictions.mean(0)
std_pred = predictions.std(0)
return mean_pred, std_pred
ベイズ手法を試している中で、不確実性の推定は、運用シナリオにおいて異常調査の優先順位付けを行ううえで特に価値があることを見いだしました。
実世界の適用例:衛星運用管制センターの統合
自動異常トリアージ・システム
衛星運用に関する実務経験をもとに、統合された異常トリアージ・システムを開発しました:
class SatelliteAnomalyTriage:
def __init__(self, model, confidence_threshold=0.85):
self.model = model
self.confidence_threshold = confidence_threshold
self.anomaly_db = self._initialize_database()
def process_telemetry_stream(self, telemetry_data, metadata):
"""リアルタイムの異常検知とトリアージ"""
anomalies = []
for satellite_id, data in telemetry_data.items():
# 時間的特徴を抽出
features = self._extract_temporal_features(data)
# 不確実性を伴う異常スコアを取得
scores, uncertainty = self.model.predict_with_uncertainty(features)
# 高い確信度を持つ異常を特定
high_conf_mask = (scores > 0.7) & (uncertainty < 0.2)
high_conf_anomalies = scores[high_conf_mask]
if len(high_conf_anomalies) > 0:
# 異常タイプを分類
anomaly_type = self._classify_anomaly(
data, high_conf_anomalies
)
# 深刻度を評価
severity = self._assess_severity(
anomaly_type,
high_conf_anomalies.mean()
)
# 応答計画を生成
response_plan = self._generate_response_plan(
satellite_id,
anomaly_type,
severity,
metadata
)
anomalies.append({
'satellite_id': satellite_id,
'anomaly_type': anomaly_type,
'severity': severity,
'confidence': 1 - uncertainty.mean().item(),
'response_plan': response_plan,
'timestamp': metadata['timestamp']
})
return anomalies
def _classify_anomaly(self, data, anomaly_scores):
"""学習済みの時間的パターンを用いて異常を分類する"""
# パターンのシグネチャを抽出
pattern_features = self._extract_pattern_signatures(data)
# 既知の異常パターンと比較
similarities = self._compute_pattern_similarity(pattern_features)# 最も類似する異常タイプを返す return self.anomaly_db.iloc[similarities.argmax()]['anomaly_type']
def _generate_response_plan(self, satellite_id, anomaly_type, severity, metadata):
"""自動応答の推奨事項を生成する"""
# 類似した異常に対する過去の応答を照会する
historical_responses = self._query_historical_responses(
anomaly_type, severity
)
# 現在の運用状況を考慮する
context = self._assess_operational_context(metadata)
# 優先度付きのアクション一覧を作成する
actions = self._prioritize_actions(
historical_responses,
context,
satellite_id
)
return {
'immediate_actions': actions['immediate'],
'short_term_actions': actions['short_term'],
'long_term_actions': actions['long_term'],
'estimated_risk': self._estimate_risk(severity, context)
}
運用統合を調査する中で、信頼度スコアと推奨アクションを提示すると、オペレーターの応答時間が大幅に改善されることが分かりました。
課題と解決策
課題1:非定常な衛星の挙動
私が遭遇した重要な問題の1つは、衛星が寿命の間ずっと一貫して振る舞うわけではないことです。コンポーネントは劣化し、ソフトウェア更新によって挙動が変わり、軌道調整によって熱特性(サーマルプロファイル)が変化します。
解決策:適応的な正規化(アダプティブ・ノーマライゼーション)の方式を実装しました。
python
class AdaptiveNormalization:
def __init__(self, window_size=1000, adaptation_rate=0.01):
self.window_size = window_size
self.rate = adaptation_rate
self.running_stats = {}
def update_and_normalize(self, satellite_id, new_data):
"""概念ドリフト検出を伴う適応的な正規化"""
if satellite_id not in self.running_stats:
self.running_stats[satellite_id] = {
'mean': new_data.mean(0),
'std': new_data.std(0),
'buffer': []
}
stats = self.running_stats[satellite_id]
# 概念ドリフトを検出する
drift_score = self._detect_drift(new_data, stats)
if drift_score > 0.1: # 有意なドリフトが検出された
# 適応によって統計量を更新する
new_mean = (1 - self.rate) * stats['mean'] + self.rate * new_data.mean(0)
new_std = (1 - self.rate) * stats['std'] + self.rate * new_data.std(0)
stats['mean'] = new_mean
stats['std'] = new_std
# 新しいレジームのためにバッファをクリアする
stats['buffer'] = []
# バッファを更新する
stats['buffer'].append(new_data)
if len(stats['buffer']) > self.window_size:
stats['buffer'].pop(0)
# データを正規化する
normalized = (new_data - stats['mean']) / (stats['std'] + 1e-8)
return normalized, drift_score
def _detect_drift(self, new_data, stats):
"""KLダイバージェンスに基づくドリフト検出"""
# 分布の統計量を計算する
current_mean = stats['mean']
current_std = stats['std']
# 分布間のKLダイバージェンスを計算する
kl_div = self._compute_kl_diverg