埋め込みエージェントによるフィードバックループを伴う、精密腫瘍学臨床ワークフローのためのスパース連合表現学習
私の研究の方向性を変えた「ひらめき」
2024年2月の火曜日、午前2時47分でした。私が研究者として経験したとしか言いようのない研究上のひらめきがありました。私は作業机にうつむき、また別の失敗した連合学習の収束曲線を見つめていました——損失関数が地震の間の地震計のように大きく振動しているのです。私の博士課程の学生は、17の病院サイトにまたがって、各施設がそれぞれ保有するプライベートなゲノムデータセットを用いて、がん種横断の変異分類器を訓練しようとしていました。しかし結果は……拍子抜けするものでした。
私は長年、医用画像の領域で連合学習に取り組んできましたが、腫瘍のゲノムはまったく別の難物でした。データは「欠損値」という意味だけでなく、根本的にスカスカだったのです。ある患者の腫瘍生検から得られるのは2万の遺伝子発現測定値かもしれないのに、実際に差分発現している遺伝子は10〜50遺伝子程度にとどまります。表現空間は、高次元の砂漠であり、そこに小さな信号のオアシスが点在している状態でした。
けれど、決定的に私の背中を押したのはフィードバックループでした。腫瘍内科の臨床パートナーは、次のように問い続けていました。「このモデルはなぜこの治療を推奨したのかを教えてくれる?そして、間違っているときにそれをどのように修正できるようにしてくれるのか?」彼らが欲しかったのは単なる予測ではありません。施設をまたいだ患者のプライバシーを尊重しつつ、リアルタイムで彼らの修正から学習できる、対話的で身体性のあるエージェントを求めていたのです。
その夜、損失曲線が振動しているのを見つめながら、私は本質的な問題に気づきました。私たちは連合学習を、静的で一度きりのプロセスとして扱っていたのです。モデルを学習し、デプロイして、そしておそらく四半期ごとに再学習する。ですが、精密腫瘍学は動的です。新しいバイオマーカーは毎月発見され、薬剤耐性はリアルタイムで生じ、臨床ワークフローは日々進化します。私たちは厳格なデータ主権を維持しながら、スパースな信号から継続的に学習し、人間を介したフィードバックを取り入れられるシステムが必要でした。
この記事は、その午前2時47分の気づきから生まれた道のりを記録します——精密腫瘍学のための、身体性のあるエージェントによるフィードバックループを伴うスパース連合表現学習(SFRL)です。
技術的背景:3つの柱
腫瘍ゲノミクスにおけるスパース性の課題
アーキテクチャに踏み込む前に、がんゲノムの基礎となるデータ構造を調べる過程で私が学んだことを共有します。TCGA(The Cancer Genome Atlas)やPCAWG(Pan-Cancer Analysis of Whole Genomes)のデータセットを研究する中で、非常に印象的な事実を見つけました。平均的な腫瘍サンプルでは、対応する正常組織と比べて差分発現している遺伝子は1%未満なのです。それでも私たちは通常、各サンプルを2万次元以上のベクトルとして表現しています。
このスパース性は偶然のノイズではなく、構造化されています。さまざまな圧縮技術を試した実験の結果、スパース性のパターン自体が臨床的な意味を持つことが分かりました。たとえば、BRCA1変異の乳がんでは、スパース性のパターンは、BRCA2変異とは異なる、特定の経路(パスウェイ)に富んだ構造に従います。スパース性そのものが信号です。
import numpy as np
from scipy.sparse import csr_matrix, save_npz
import torch
import torch.nn as nn
# TCGA-BRCAサンプルに見られる現実のスパース性パターン
# 1,000人の患者、20,000遺伝子だが、活性化しているのは患者あたり約150遺伝子だけ
def simulate_oncogenomics_sparsity(n_patients=1000, n_genes=20000, active_per_patient=150):
"""
実際の腫瘍データセットで観測した、構造化されたスパース性をシミュレートします。
スパース性はランダムではありません——経路レベルの活性化パターンに従います。
"""
# 経路(パスウェイ)の所属行列(遺伝子から経路)を作成
n_pathways = 500
pathway_genes = np.random.choice(n_genes, (n_pathways, 30)) # 1つの経路あたり30遺伝子
# 各患者は経路の一部を活性化する
active_pathways = np.random.choice(n_pathways, (n_patients, 10)) # 活性化する経路は10個
# スパース行列を構築
row_indices = []
col_indices = []
data = []
for patient in range(n_patients):
for pathway in active_pathways[patient]:
for gene in pathway_genes[pathway]:
row_indices.append(patient)
col_indices.append(gene)
# 式の値は対数正規分布に従います
data.append(np.random.lognormal(mean=2.0, sigma=0.5))
return csr_matrix((data, (row_indices, col_indices)), shape=(n_patients, n_genes))
# 重要な洞察:疎性の割合は約0.75%ですが、構造化されています
sparse_data = simulate_oncogenomics_sparsity()
print(f"Sparsity: {100 * (1 - sparse_data.nnz / (sparse_data.shape[0] * sparse_data.shape[1])):.2f}%")
連合学習はスパース表現に出会う
連合学習をいろいろ試した中で興味深かった発見の1つは、標準のFedAvg(Federated Averaging)が、疎な腫瘍データに対して致命的に失敗することでした。その理由は? 17の病院それぞれが独自の疎なデータ分布を持っているとき、それらのモデル更新を平均すると、得られるグローバルモデルは彼らの疎性パターンの共通部分のみを捉えます。これはしばしば空になります。
勾配の疎性パターンを調べることで、解決策が疎部分空間のアラインメントにあると分かりました。全パラメータ空間で平均する代わりに、各病院のモデルを、どの特徴が重要かについてのコンセンサスによって定義された共通の疎部分空間へ投影します。
class SparseFederatedAveraging:
"""
疎な平均化の問題を解決した私の実装です。
重要な革新点:クライアント間でコンセンサスとなる疎性マスクを維持すること。
"""
def __init__(self, n_features=20000, sparsity_ratio=0.01):
self.n_features = n_features
self.sparsity_ratio = sparsity_ratio
# グローバルなコンセンサスマスク:どの特徴が臨床的に関連しているか?
self.consensus_mask = torch.zeros(n_features, dtype=torch.bool)
self.client_masks = {} # 各クライアントの疎性パターンを追跡します
def update_consensus_mask(self, client_updates):
"""
そのままの平均化ではなく、特徴の重要度についてコンセンサスを構築します。
これは私の研究におけるブレイクスルーの瞬間でした。
"""
feature_importance = torch.zeros(self.n_features)
n_clients = len(client_updates)
for client_id, (gradients, mask) in client_updates.items():
# 各クライアントは、自分が重要だと考える特徴を報告します
feature_importance[mask] += 1.0 / n_clients
# 50%以上のクライアントが重要だと同意している特徴だけを残します
self.consensus_mask = feature_importance > 0.5
# コンセンサスの部分空間にすべてのクライアント更新を射影する
projected_updates = {}
for client_id, (gradients, _) in client_updates.items():
projected = torch.zeros_like(gradients)
projected[self.consensus_mask] = gradients[self.consensus_mask]
projected_updates[client_id] = projected
return projected_updates
def federated_average(self, projected_updates, client_weights):
"""
スパースなコンセンサス部分空間における重み付き平均。
これにより、各病院の固有の信号が保持されつつ
共有された表現も維持されます。
"""
global_update = torch.zeros(self.n_features)
total_weight = sum(client_weights.values())
for client_id, update in projected_updates.items():
global_update += (client_weights[client_id] / total_weight) * update
return global_update
埋め込みエージェントのフィードバックループ
人間のフィードバックからの強化学習(RLHF)について学ぶ中で、標準的なアプローチ――嗜好(preferences)をオフラインで収集して微調整(fine-tuning)する――は、臨床ワークフローにとってあまりにも遅いことに気づきました。腫瘍内科医は1日に20人の患者を診て、その場で意思決定を行うかもしれません。必要なのは、AI その瞬間 に修正を加え、効果をすぐに確認することです。
そこで私が設計したのが 埋め込みエージェントのフィードバックループ です。これは、共有された表現空間を通じてAIシステムが継続的に臨床医と相互作用し、プライバシーを侵害せずに修正から学習するための枠組みです。
class EmbodiedFeedbackAgent:
"""
臨床ワークフローの中に存在し、埋め込みフィードバックから学習するエージェント。
この発想は、臨床医がAIの誤りを、単に明示的なラベルだけでなく
微妙で文脈に即した行動によって修正する、という私の観察に触発されました。
"""
def __init__(self, representation_dim=256):
self.representation_dim = representation_dim
# ローカルで、プライバシーを保護する表現を維持する
self.patient_embeddings = {} # 患者ID -> 埋め込み
self.feedback_buffer = [] # (埋め込み, 修正ベクトル, タイムスタンプ)
# フィードバックモデル - 臨床医の修正から学習する
self.feedback_network = nn.Sequential(
nn.Linear(representation_dim * 2, 512),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(512, representation_dim),
nn.Tanh() # 修正の方向について [-1, 1] の出力
)
def get_patient_embedding(self, patient_id, genomic_data):
"""
患者のための、プライバシーを保護する埋め込みを生成します。
埋め込みは疎で、かつ可逆的ではありません。
"""
# 実際には、VAEまたは疎なオートエンコーダを使用するでしょう
# デモンストレーションでは、単純な疎な射影を用います
embedding = torch.zeros(self.representation_dim)
返却形式: {"translated": "翻訳されたHTML"}# 差次的に発現が最も異なる遺伝子の上位50のみをエンコードする
top_genes = torch.topk(genomic_data, k=50).indices
embedding[top_genes % self.representation_dim] = 1.0
self.patient_embeddings[patient_id] = embedding
return embedding
def process_clinician_feedback(self, patient_id, recommended_action, clinician_action):
"""
コアとなるフィードバックループ:臨床医が推奨された行動とは異なる行動を取る。
埋め込み空間における補正方向を学習する。
"""
patient_emb = self.patient_embeddings[patient_id]
# 推奨された行動と実際の行動の不一致をエンコードする
action_diff = clinician_action - recommended_action # 行動空間において
# 表現空間での補正へ変換する
correction = self.feedback_network(
torch.cat([patient_emb, action_diff])
)
# フェデレーテッドラーニング用に保存する(補正ベクトルのみ)
self.feedback_buffer.append({
'patient_embedding': patient_emb.detach(),
'correction': correction.detach(),
'timestamp': time.time()
})
# この患者に対して即座に補正を適用する
updated_embedding = patient_emb + 0.1 * correction
self.patient_embeddings[patient_id] = updated_embedding
return updated_embedding
def get_feedback_summary(self):
"""
フィードバックを集約して、フェデレーテッドラーニング用の疎な勾配にする。
患者データの代わりに、補正の方向のみを共有する。
"""
if len(self.feedback_buffer) < 10: # 最低限のフィードバックが必要
return None
# 平均の補正方向を計算する
corrections = torch.stack([f['correction'] for f in self.feedback_buffer])
avg_correction = corrections.mean(dim=0)
# 疎化する:最も重要な補正方向の上位10%のみを保持する
threshold = torch.quantile(torch.abs(avg_correction), 0.9)
sparse_correction = torch.where(
torch.abs(avg_correction) > threshold,
avg_correction,
torch.zeros_like(avg_correction)
)
return sparse_correction
実装:完全なSFRLパイプライン
何か月も反復を重ねた結果、動作する実装に到達しました。私の試行錯誤から生まれた中核となるアーキテクチャを、順を追って説明します。
疎なフェデレーテッド表現学習フレームワーク
python
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, List, Tuple, Optional
import numpy as np
from collections import defaultdict
返却形式: {"translated": "翻訳されたHTML"}class SparseOncologyEncoder(nn.Module):
"""
高次元のゲノムデータから疎な表現を学習するエンコーダ。
重要な革新点:疎性は「課す」のではなく「学習する」。
"""
def __init__(self, input_dim=20000, latent_dim=256, sparsity_alpha=0.1):
super().__init__()
self.input_dim = input_dim
self.latent_dim = latent_dim
self.sparsity_alpha = sparsity_alpha # 疎性正則化の強度
# ボトルネックを備えた3段階エンコーダ
self.encoder = nn.Sequential(
nn.Linear(input_dim, 1024),
nn.BatchNorm1d(1024),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(1024, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(512, latent_dim),
)
# 学習される疎性マスク(重要な革新点)
self.sparsity_logits = nn.Parameter(torch.zeros(latent_dim))
def forward(self, x, apply_sparsity=True):
# 潜在空間へエンコード
z = self.encoder(x)
if apply_sparsity:
# 微分可能な疎性選択のためのGumbel-Softmax
# これが僕が見つけたコツ:疎性を学習可能にする
temperature = 0.5
noise = -torch.log(-torch.log(torch.rand_like(self.sparsity_logits) + 1e-8) + 1e-8)
sparsity_mask = torch.sigmoid((self.sparsity_logits + noise) / temperature)
# 推論時のハード閾値
if not self.training:
sparsity_mask = (sparsity_mask > 0.5).float()
z = z * sparsity_mask
return z
def get_sparsity_loss(self, z):
"""疎性を促すための潜在表現に対するL1ペナルティ。"""
return self.sparsity_alpha * torch.mean(torch.abs(z))
class FederatedOncologyServer:
"""
病院間での分散学習(フェデレーテッドラーニング)を統括するサーバ。
グローバルな疎な表現空間を維持する。
"""
def __init__(self, latent_dim=256, n_clients=17):
self.latent_dim = latent_dim
self.n_clients = n_clients
# グローバルモデル(疎なエンコーダ+分類器)
self.global_encoder = SparseOncologyEncoder(latent_dim=latent_dim)
self.global_classifier = nn.Linear(latent_dim, 5) # 5つのがんサブタイプ
# クライアントの状態追跡
self.client_states = {}
self.client_feedback = defaultdict(list)
# コンセンサス(合意)メカニズム
self.consensus_sparsity_mask = torch.ones(latent_dim)
def aggregate_client_updates(self, client_updates: Dict[str, Dict]):
"""
疎なコンセンサスを伴うフェデレーテッド集約ステップ。
ここで魔法が起きる。
"""
# ステップ1:重要な潜在次元に関するコンセンサスを構築
dimension_importance = torch.zeros(self.latent_dim)
for client_id, update in client_updates.items():
# 各クライアントは使用した次元を報告する
client_mask = update.get('sparsity_mask', torch.ones(self.latent_dim))
dimension_importance += client_mask
# コンセンサス:少なくとも60%のクライアントが使用した次元
self.consensus_sparsity_mask = (dimension_importance / self.n_clients) > 0.6
# ステップ2:コンセンサス部分空間での重み付き平均
encoder_updates = []
classifier_updates = []
weights = []
for client_id, update in client_updates.items():
encoder_updates.append(update['encoder_state'])
classifier_updates.append(update['classifier_state'])
weights.append(update.get('weight', 1.0 / self.n_clients))
# 重み付き平均
total_weight = sum(weights)
# エンコーダのパラメータを平均
new_encoder_state = {}
for key in encoder_updates[0].keys():
weighted_sum = sum(
w * state[key] for w, state in zip(weights, encoder_updates)
)
new_encoder_state[key] = weighted_sum / total_weight
# エンコーダの重みにコンセンサスマスクを適用
for key in new_encoder_state:
if 'weight' in key and new_encoder_state[key].dim() == 2:
# 出力層のうちコンセンサスにない次元をゼロにする
if new_encoder_state[key].shape[0] == self.latent_dim:
mask = self.consensus_sparsity_mask.unsqueeze(1)
new_encoder_state[key] = new_encoder_state[key] * mask
# 分類器の平均
new_classifier_state = {}
for key in classifier_updates[0].keys():
weighted_sum = sum(
w * state[key] for w, state in zip(weights, classifier_updates)
)
new_classifier_state[key] = weighted_sum / total_weight
return new_encoder_state, new_classifier_state
def incorporate_feedback(self, feedback_updates: Dict[str, torch.Tensor]):
"""
身体化(embodied)エージェントからのフィードバックをグローバルモデルへ組み込む。
これは継続的な学習ループ。
"""
# 全クライアントからのフィードバックを集約
all_feedback = torch.stack(list(feedback_updates.values()))
avg_feedback = all_feedback.mean(dim=0)
# フィードバックを分類器への補正として適用
with torch.no_grad():
# フィードバックが意思決定境界を調整
self.global_classifier.weight.data += 0.01 * avg_feedback.unsqueeze(0)
return avg_feedback
class ClinicalWorkflowAgent:
"""
臨床ワークフローの中で生きる身体化エージェント。
臨床家と相互作用し、彼らの意思決定から学び、分散モデルを更新する。
"""
def __init__(self, server: FederatedOncologyServer, hospital_id: str):
self.server = server
self.hospital_id = hospital_id
# ローカルモデル(コピー