Skip to content

Training Methodology

Golem is trained via Behavioral Cloning (BC), a foundational paradigm of Imitation Learning (IL). By treating the expert's gameplay traces as the optimal policy \(\pi^*\), the training regime is formulated as a supervised, multi-label sequence classification task over continuous time-series data.

1. Contiguous Temporal Loading

Because Liquid Neural Networks (LNNs) and their Closed-form Continuous (CfC) approximations model a continuous hidden state \(x(t)\), individual frames cannot be uniformly shuffled during training. The dataset pipeline enforces temporal causality via a contiguous sequence extraction protocol, dynamically loading tensors from isolated profile directories (e.g., data/fluid/).

To prevent Out-Of-Memory (OOM) crashes when processing hours of high-dimensional multi-modal gameplay, Golem does not duplicate flat arrays in memory. Instead, it constructs a lightweight pointer map consisting of tuples (file_idx, start_idx, is_mirrored, is_first). During training, continuous arrays are lazily sliced on-the-fly into strictly contiguous, non-overlapping sequences with a stride equal to \(L\).

To prevent Stateful BPTT Continuity Collapse (where a naive dataloader would effectively teleport the agent's memory hundreds of frames into the future between batches), PyTorch's default DataLoader iteration is replaced with a custom StatefulStratifiedBatchSampler. This sampler abandons a flat list structure and instead manages \(B\) independent parallel streams. It guarantees that row \(b\) in batch step \(k+1\) is the exact chronological continuation of row \(b\) from step \(k\), seamlessly preserving the ODE time constants across the entire epoch.

Given an expert trajectory of length \(T\), defined as \(\tau=\{(o_1,y_1),(o_2,y_2),\dots,(o_T,y_T)\}\), and a fixed temporal sequence length \(L\) (e.g., \(L=32\)), we extract sequence batches. With the introduction of multi-modal sensor fusion, the observation \(o_t\) is a composite of visual, auditory, and thermal inputs. The input tensor sequences \(\mathbf{X}^{(vis)}_i\), \(\mathbf{X}^{(aud)}_i\), and \(\mathbf{X}^{(thm)}_i\), and the target action sequence \(\mathbf{Y}_i\) starting at index \(i\) are:

\[ \mathbf{X}^{(vis)}_i=\{o^{(vis)}_t\}_{t=i}^{i+L-1},\quad\mathbf{X}^{(vis)}_i\in\mathbb{R}^{L\times C\times64\times64} \]
\[ \mathbf{X}^{(aud)}_i=\{o^{(aud)}_t\}_{t=i}^{i+L-1},\quad\mathbf{X}^{(aud)}_i\in\mathbb{R}^{L\times2\times H_{mels}\times W_{time}} \]
\[ \mathbf{X}^{(thm)}_i=\{o^{(thm)}_t\}_{t=i}^{i+L-1},\quad\mathbf{X}^{(thm)}_i\in\{0,1\}^{L\times1\times64\times64} \]
\[ \mathbf{Y}_i=\{y_t\}_{t=i}^{i+L-1},\quad\mathbf{Y}_i\in\{0,1\}^{L\times n_\rho} \]

Where \(C \in \{3,4\}\) depends on whether the depth buffer is enabled, and \(n_\rho\) is the dimensionality of the action space dictated by the active environment profile \(\rho\) (e.g., Basic, Classic, Fluid).


2. The Objective Function (BCE & Focal Loss)

At each time step \(t\), the network outputs a vector of raw logits \(\mathbf{z}_t\in\mathbb{R}^{n_\rho}\). Because the action space allows for simultaneous key presses (e.g., strafing right while firing), the foundation of the objective is evaluated using Binary Cross-Entropy (BCE) with Logits Loss.

The baseline BCE loss \(\mathcal{L}_{BCE}\) for a single sequence of length \(L\) over \(n_\rho\) independent binary action channels is computed as:

\[ \mathcal{L}_{BCE}(\theta)=-\frac{1}{L\cdot n_\rho}\sum_{t=1}^{L}\sum_{j=1}^{n_\rho}\left[y_{t,j}\log(\sigma(z_{t,j}))+(1-y_{t,j})\log(1-\sigma(z_{t,j}))\right] \]

Where \(\sigma(\cdot)\) is the Sigmoid activation function, \(y_{t,j}\) is the ground truth label, and \(z_{t,j}\) is the network's prediction.

However, pure BCE treats all errors equally. Because human expert demonstrations consist overwhelmingly of simple navigation frames (the "Hold W Trap"), the cumulative gradient of these easily classified background actions overwhelms the sparse, high-value gradients of rare actions like combat.

To cure this convergence trap, Golem implements Focal Loss, which extends the BCE formulation by introducing a dynamically scaled modulating factor. Let \(p_{t,j}=\sigma(z_{t,j})\). The Focal Loss \(\mathcal{L}_{focal}\) is computed as:

\[ \mathcal{L}_{focal}(\theta)=-\frac{1}{L\cdot n_\rho}\sum_{t=1}^{L}\sum_{j=1}^{n_\rho}\left[\alpha y_{t,j}(1-p_{t,j})^\gamma\log(p_{t,j})+(1-\alpha)(1-y_{t,j})p_{t,j}^\gamma\log(1-p_{t,j})\right] \]
  • The Focusing Parameter (\(\gamma\)): As the model's confidence in a correct prediction increases (\(p_{t,j}\to1\) for positive classes, or \(p_{t,j}\to0\) for negative classes), the modulating factor \((1-p_{t,j})^\gamma\) decays to zero. This exponentially suppresses the gradient contribution of easily classified navigation frames, forcing the optimizer to focus strictly on hard, misclassified instances. Standard BCE is recovered when \(\gamma=0\).
  • The Weighting Factor (\(\alpha\)): A static scalar (e.g., \(\alpha=0.25\)) that balances the intrinsic priority of positive targets versus negative targets, mitigating the sheer volume of 0s (keys not pressed) in the multi-label distribution.

The network parameters \(\theta\) are subsequently updated via Backpropagation Through Time (BPTT).

Stateful Backpropagation Through Time (BPTT)

Because the LNN's Closed-form Continuous (CfC) cells require a continuous temporal flow to accurately accumulate evidence and trigger action potentials, the training loop utilizes Stateful BPTT. The hidden state output \(hx\) from a batch is retained, detached from the computational graph (\(hx = hx.detach()\)), and passed as the prior state for the subsequent batch. To prevent mathematical amnesia while respecting independent trajectory boundaries, the sampler streams maintain sequence chronologies, while a dynamic boolean mask zeros out the hidden state exclusively for sequences mapped to the start of a new .npz file, preventing "past life" momentum leakage.


3. Class Imbalance & Mirror Augmentation

While Focal Loss successfully mitigates action-frequency bias, human gameplay datasets also exhibit severe topological and spatial biases. For example, a dataset derived from a specific maze may contain an 80/20 ratio of left turns to right turns. Unmitigated, this spatial sparsity causes the network to collapse into localized minima, such as the "Zoolander Problem" (inability to turn right).

Golem counteracts spatial bias dynamically via Mirror Augmentation. During data streaming, the dataset yields reflected visual and thermal observation tensors \(o'^{(vis)}_t\) and \(o'^{(thm)}_t\) across the vertical axis (width):

\[ o'^{(vis)}_{t,c,h,w}=o^{(vis)}_{t,c,h,W-w-1} \]
\[ o'^{(thm)}_{t,c,h,w}=o^{(thm)}_{t,c,h,W-w-1} \]

If the auditory sensor is enabled, perfect spatial symmetry must also be maintained across the agent's "hearing." This is achieved by physically swapping the left and right stereo channels (channel index 0 and 1) across the 2D Mel Spectrogram:

\[ o'^{(aud)}_{t,c_{flip},h_{mel},w_{time}}=o^{(aud)}_{t,1-c_{flip},h_{mel},w_{time}} \]

To maintain ground-truth causality, the corresponding target vector \(y'_t\) must undergo a specific permutation. Let \(P_\rho\) be an \(n_\rho\times n_\rho\) permutation matrix defined by the active profile \(\rho\), which swaps the indices corresponding to strictly spatial actions:

  • \(idx_{\text{MoveLeft}}\leftrightarrow idx_{\text{MoveRight}}\)
  • \(idx_{\text{TurnLeft}}\leftrightarrow idx_{\text{TurnRight}}\)

All state-invariant actions (e.g., Attack, Use, NextWeapon) map to the identity matrix within \(P_\rho\). The augmented target vector is thus:

\[ y'_t=P_\rho y_t \]

This geometric inversion enforces perfect spatial symmetry in the agent's spatial reasoning, effectively doubling the dataset's topological variance without requiring additional recording sessions.


4. Covariate Shift & DAgger Intervention

A fundamental flaw of pure Behavioral Cloning is Covariate Shift (the "Perfect Play" trap). If the network is trained exclusively on flawless expert demonstrations, it never learns how to recover from mistakes. During live inference, a microscopic mathematical error will push the agent slightly off the optimal trajectory. Because this sub-optimal state \(s_{err}\) exists outside the training distribution, the agent's predictions become chaotic, and the errors rapidly compound until the agent is completely stuck.

To cure this, Golem employs DAgger (Dataset Aggregation). During live inference, the human expert monitors the autonomous agent. If the agent enters an equilibrium state (e.g., staring into a corner), the human holds a hotkey to instantly suspend the LNN's logits and hijack the controls.

To ensure the network understands the causal sequence that led to the error, the intervention pipeline utilizes a rolling collections.deque buffer. This temporarily stores the \(L\) autonomous frames immediately preceding the override. When the human operator takes control, this historical context is flushed into a _recovery trace alongside the corrective actions. This explicitly teaches the network not only the correction, but the specific sub-optimal visual precursors that demand it, actively teaching the network how to correct trajectory deviations.

Catastrophic Forgetting & Stratified Sampling

When applying the DAgger interventions, pure sequential fine-tuning on the _recovery traces would cause the model to suffer from catastrophic forgetting. The continuous differential equations governing the LNN's hidden state would overfit to the highly localized corrective vectors, effectively collapsing the broader phenomenological heuristics previously learned for normal navigation.

To maintain structural integrity of the dynamical system, Golem utilizes deterministic Stratified Sampling. The custom StatefulStratifiedBatchSampler explicitly allocates a percentage of the parallel batch streams (e.g., 25%) strictly to recovery sequences, and the remaining 75% to base expert play. This mathematically guarantees that every backpropagation step contains a balanced gradient representing both the optimal base policy \(\pi^*\) and the localized recovery vectors, perfectly preserving general topological reasoning while routing the optimizer's computational effort toward out-of-distribution corrections.


5. Curriculum Learning & Procedural Priors

To generalize beyond fixed configurations and prevent spatial overfitting, Golem relies on dynamic procedural generation. However, pure uniform random sampling over map variables (\(x_i \sim U(a_i, b_i)\)) generates degenerate geometry that pollutes gradient flow. For example, spawning a boss swarm inside a micro-sized corridor causes hitboxes to clip, generating erratic optical flow that conflicts with continuous-time causal learning.

To enforce topological coherence, the CurriculumObligeGenerator establishes a structured probability distribution across multiple training epochs.

Phase-Based Variance Constraints

The continuous hidden state \(x(t)\) requires time to establish a baseline causal understanding of spatial geometry before dealing with hyper-complex disruptions. Curriculum learning parameterizes the sample bounds based on the epoch phase \(t\):

\[ x_i \sim U(a_i(t), b_i(t)) \]
  • Phase 1 (Stabilization): Constrains sampling to simple topologies. It explicitly removes hazards like teleporters, which instantly replace the visual manifold and cause "Amnesia Traps" by collapsing the ODE's temporal derivative \(\frac{dx}{dt}\) before the network has learned to persist memory through structural jumps.
  • Phase 2 (Expansion): Introduces verticality, complex room flow, and moderate threat density.
  • Phase 3 (Generalization): Opens the bounds to the full multi-variate distribution defined in the configuration, forcing the model to adapt to arbitrary topological extremes.

Conditional Probabilities (Bayesian Networks)

To reject physically impossible edge cases within a phase, the generator applies a strict Bayesian network of conditional dependencies, rather than assuming independence (\(P(M) = \prod P(x_i)\)).

Map parameters are mathematically coupled. For instance, the probability of monster density is evaluated conditionally on the volumetric capacity of the level:

\[ P(\text{mons} \mid \text{size} = \text{micro}) \]

By routing the randomizer logic through conditional priors (e.g., forcibly down-scaling enemy density if the geometry footprint is insufficient, or enforcing outdoor generation if steepness is set to an epic scale), the procedural pipeline guarantees that every generated map provides mathematically sound gradients, maximizing the utility of the training data.


6. Diagnostic Auditing & Validation

Because the aggregate loss scalar \(\mathcal{L}(\theta)\) fundamentally obscures multi-label class imbalances (a model that never shoots will still achieve 95% accuracy if the "Attack" label is sparse), Golem utilizes a dedicated static audit module.

The audit evaluates the trained weights over a validation slice by generating a strictly thresholded (\(\sigma(\mathbf{z})>0.5\)) Confusion Matrix for every individual channel \(j\) in the \(n_\rho\) action space. It evaluates the network based on:

Precision (\(P_j\))

The probability that the agent's decision to act was correct.

\[ P_j=\frac{TP_j}{TP_j+FP_j} \]

Recall (\(R_j\)):

The probability that the agent successfully reacted to an environmental stimulus requiring action \(j\).

\[ R_j=\frac{TP_j}{TP_j+FN_j} \]

Where \(TP\), \(FP\), and \(FN\) represent True Positives, False Positives, and False Negatives, respectively.

Addressing Redundancy (Stride)

Historically, during normal training, the sliding window overlapped by shifting exactly one frame per sequence step. However, during auditing, evaluating identical frames repeatedly artificially inflates the diagnostic support counts and yields inaccurate exact-match metrics. To resolve this, the audit dataloader enforces a sequence stride equal to \(L\), meaning non-overlapping segments are tested to precisely evaluate every frame only once. This non-overlapping stride is now mirrored in the training dataloader to satisfy the strict chronological requirements of Stateful BPTT.


API Reference

The data extraction, LNN optimization, evaluation mechanics, and curriculum parameters are orchestrated by the handlers below.

The Training Loop

Trains the Liquid Neural Network using captured expert demonstrations.

This function orchestrates the dataset streaming and the model's training loop. It dynamically selects the best available hardware accelerator (CUDA, MPS, or CPU), initializes the dataset with optional mirror augmentation, and optimizes the network weights using the Adam optimizer.

If an active model already exists for the current profile (e.g., fluid), it loads the existing weights to perform continuous fine-tuning. Upon completion, it saves the updated model to both a timestamped archive and the active profile slot.

Parameters:

Name Type Description Default
cfg GolemConfig

The centralized application configuration object.

required
module_name str

The specific data module to train against (e.g., "combat", "navigation"). If "all" or None, it trains across all available data for the active profile (Generalization Mode). Default: None.

None
Source code in app/pipeline/train.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
@register_command("train")
def train(cfg: GolemConfig, module_name: str = None, include_recovery: bool = False):
    r"""
    Trains the Liquid Neural Network using captured expert demonstrations.

    This function orchestrates the dataset streaming and the model's training loop. It dynamically selects the best available hardware accelerator (CUDA, MPS, or CPU), initializes the dataset with optional mirror augmentation, and optimizes the network weights using the Adam optimizer.

    If an active model already exists for the current profile (e.g., ``fluid``), it loads the existing weights to perform continuous fine-tuning. Upon completion, it saves the updated model to both a timestamped archive and the active profile slot.

    Args:
        cfg (GolemConfig): The centralized application configuration object.
        module_name (str, optional): The specific data module to train against (e.g., "combat", "navigation"). If ``"all"`` or ``None``, it trains across all available data for the active profile (Generalization Mode). Default: ``None``.
    """
    if torch.backends.mps.is_available():
        device = torch.device("mps")
        logger.info("Apple Metal (MPS) acceleration detected and enabled.")
    elif torch.cuda.is_available():
        device = torch.device("cuda")
        logger.info("CUDA acceleration detected and enabled.")
    else:
        device = torch.device("cpu")
        logger.warning("No GPU detected. Training will be slow on CPU.")

    active_profile = cfg.brain.mode
    base_data_dir = Path(resolve_path(cfg.data.dirs["training"])) / active_profile

    # 1. Load the Dataset
    if module_name and module_name.lower() == "all":
        file_pattern = f"{cfg.data.prefix}*.npz"
    else:
        file_pattern = f"{cfg.data.prefix}{module_name}*.npz"

    # Aggregate target directories
    data_dirs = [base_data_dir]
    if include_recovery:
        recovery_dir = base_data_dir / "recovery"
        if recovery_dir.exists():
            data_dirs.append(recovery_dir)
            logger.info("Recovery (DAgger) data will be included in this training run.")
        else:
            logger.warning(f"Recovery directory {recovery_dir} not found. Proceeding without recovery data.")

    dataset = DoomStreamingDataset(
        data_dirs, 
        seq_len=cfg.training.sequence_length,
        file_pattern=file_pattern,
        augment=cfg.training.augmentation.mirror,
        action_names=cfg.training.action_names,
        dsp_config=cfg.brain.dsp,
        sensors=cfg.brain.sensors
    )

    if len(dataset) == 0:
        logger.error(f"No training data found matching pattern: {file_pattern} in {data_dirs}")
        return

    # Mount the Stratified Sampler 
    sampler = StatefulStratifiedBatchSampler(
        base_episodes=dataset.base_episodes,
        recovery_episodes=dataset.recovery_episodes,
        batch_size=cfg.training.batch_size,
        recovery_ratio=0.25 if include_recovery else 0.0
    )

    # Remove shuffle=False and drop_last=True, as the Sampler handles sequence logic internally
    dataloader = DataLoader(dataset, batch_sampler=sampler)

    n_actions = cfg.training.action_space_size

    # 1. Discover architecture and dimensions if resuming training
    model_dir = Path(resolve_path(cfg.data.dirs["model"])) / active_profile
    active_model_path = base_data_dir / "golem.pth"  # 
    state_dict = None
    archives = []

    if active_model_path.exists():
        logger.info(f"Discovering existing brain architecture from {active_model_path} for fine-tuning...")
        state_dict = torch.load(str(active_model_path), map_location=device, weights_only=True)

        if 'output.weight' in state_dict:
            n_actions = state_dict['output.weight'].shape[0]

        archives = list(model_dir.glob("*.pth"))

    apply_latest_parameters(cfg, archives)

    # 3. Initialize dynamic model
    model = DoomLiquidNet(
        n_actions=n_actions,
        cortical_depth=cfg.brain.cortical_depth,
        working_memory=cfg.brain.working_memory,
        sensors=cfg.brain.sensors,
        dsp_config=cfg.brain.dsp
    ).to(device)

    if state_dict:
        model.load_state_dict(state_dict)

    if cfg.training.loss == LossType.FOCAL:
        logger.info("Initializing Focal Loss with static alpha vector from configuration...")

        alpha_vector = np.full(n_actions, cfg.loss.focal.alpha)

        alpha_tensor = torch.tensor(alpha_vector, dtype=torch.float32).to(device)
        criterion = FocalLossWithLogits(
            alpha=alpha_tensor, 
            gamma=cfg.loss.focal.gamma
        )

    elif cfg.training.loss == LossType.BCE:
        criterion = nn.BCEWithLogitsLoss()

    elif cfg.training.loss == LossType.ASL:
        logger.info(f"Initializing Asymmetric Loss (gamma_neg={cfg.loss.asymmetric.gamma_neg}, gamma_pos={cfg.loss.asymmetric.gamma_pos})...")
        criterion = AsymmetricLoss(
            gamma_neg=cfg.loss.asymmetric.gamma_neg,
            gamma_pos=cfg.loss.asymmetric.gamma_pos,
            clip=cfg.loss.asymmetric.clip
        )
    elif cfg.training.loss == LossType.SMOOTH:
        # Initialize the new Label Smoothed BCE
        logger.info(f"Initializing Label Smoothing BCE (epsilon={cfg.loss.smooth.epsilon})...")
        criterion = LabelSmoothingBCEWithLogits(epsilon=cfg.loss.smooth.epsilon)
    else:
        criterion = nn.BCEWithLogitsLoss()

    optimizer = optim.Adam(model.parameters(), lr=cfg.training.learning_rate)

    logger.info(f"Starting training for {cfg.training.epochs} epochs...")
    model.train()
    hx = None # Initialize global hidden state

    start_time = time.time()

    for epoch in range(cfg.training.epochs):
        total_loss = 0
        batches = 0

        for batch_idx, (inputs, actions) in enumerate(dataloader):
            x_vis = inputs['visual'].to(device)
            x_aud = inputs['audio'].to(device) if 'audio' in inputs else None
            x_thm = inputs['thermal'].to(device) if 'thermal' in inputs else None
            actions = actions.to(device)

            optimizer.zero_grad()
            predictions, new_hx = model(x_vis, x_aud=x_aud, x_thm=x_thm, hx=hx)            

            # 1. Mask the hidden state for individual sequence resets
            is_first = inputs['is_first'].to(device).float() # Shape: (batch_size, 1)

            if hx is not None:
                # Zero out hx for batch indices where is_first == 1
                mask = 1.0 - is_first 
                if isinstance(hx, (list, tuple)):
                    hx = [h * mask for h in hx]
                else:
                    hx = hx * mask

            # Use dynamic n_actions for the safety check
            if actions.shape[2] != n_actions:
                logger.error(f"CRITICAL: Data Mismatch! Found {actions.shape[2]} actions in data, but Brain expects {n_actions}.")
                return

            loss = criterion(predictions, actions)
            loss.backward()
            optimizer.step()

            # DETACH hx to prevent backpropagating through the entire history of the session
            # This implements the "Truncated" part of Truncated BPTT
            if isinstance(new_hx, (list, tuple)):
                hx = [h.detach() for h in new_hx]
            else:
                hx = new_hx.detach()

            total_loss += loss.item()
            batches += 1

            if batch_idx % 50 == 0:
                logger.info(f"Epoch {epoch+1} | Batch {batch_idx} | Loss: {loss.item():.5f}")

        avg_loss = total_loss / batches if batches > 0 else 0
        logger.info(f"Epoch {epoch+1}/{cfg.training.epochs} complete. Average Loss: {avg_loss:.5f}")

    duration = time.time() - start_time
    logger.info(f"Training finished in {duration:.2f}s.")

    # Save the archive model using the dynamic prefix generator
    date_str = datetime.now().strftime("%Y-%m-%d")
    model_prefix = generate_model_prefix(cfg, date_str)

    archive_path = get_unique_filename(model_dir, model_prefix, "pth")

    torch.save(model.state_dict(), archive_path)
    logger.info(f"Model archive saved to: {archive_path}")

    # Update the active model
    torch.save(model.state_dict(), str(active_model_path))
    logger.info(f"Active model updated at: {active_model_path}")

DAgger Intervention

Source code in app/pipeline/intervene.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
@register_command("intervene")
def intervene(cfg: GolemConfig, module_name: str = "combat"):
    try:
        from pynput import keyboard
    except ImportError:
        logger.error("pynput is required for DAgger. Run: pip install pynput")
        return

    if torch.backends.mps.is_available():
        device = torch.device("mps")
    elif torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu")

    logger.info(f"Loading Golem Brain on {device} for DAgger Intervention...")

    # Load Model from standard active model location
    active_profile = cfg.brain.mode
    model_path = Path(resolve_path(cfg.data.dirs["training"])) / active_profile / "golem.pth"
    n_actions = cfg.training.action_space_size 

    model = DoomLiquidNet(
        n_actions=n_actions,
        cortical_depth=cfg.brain.cortical_depth,
        working_memory=cfg.brain.working_memory,
        sensors=cfg.brain.sensors,
        dsp_config=cfg.brain.dsp
    ).to(device)

    try:
        model.load_state_dict(torch.load(str(model_path), map_location=device))
        model.eval()
    except FileNotFoundError:
        logger.error(f"No model found at {model_path}. Train first!")
        return

    if module_name not in cfg.modules:
        logger.error(f"Module '{module_name}' not found.")
        return

    # Setup Environment using brain.mode
    active_profile = cfg.brain.mode
    cfg_path = cfg.config[active_profile]
    module = cfg.modules[module_name]
    scenario = module.scenario
    map_name = module.map

    # Configure engine to always generate all buffers for the dataset regardless of the loaded brain
    all_sensors = SensorsConfig(visual=True, depth=True, audio=True, thermal=True)
    # always get all_sensors for recording data
    game = get_game(cfg_path, scenario, all_sensors, map_name=map_name)
    game.init()

    # Get active bindings for the enabled brain mode
    active_bindings = cfg.keybindings.get(active_profile, {})
    action_labels = cfg.training.action_names

    # Pass bindings to dynamically map keys
    controller = InterventionController(action_labels, active_bindings)

    # Rolling Context Buffers
    auto_frames = deque(maxlen=cfg.training.sequence_length)
    auto_depths = deque(maxlen=cfg.training.sequence_length)
    auto_audios = deque(maxlen=cfg.training.sequence_length)
    auto_thermals = deque(maxlen=cfg.training.sequence_length)
    auto_actions = deque(maxlen=cfg.training.sequence_length)

    # Memory Buffer
    recovery_frames, recovery_depths, recovery_audios, recovery_actions, recovery_thermals \
            = [], [], [], [], []

    logger.info("======================================================")
    logger.info("DAgger Mode Active. Golem is running autonomously.")
    logger.info("HOLD [LEFT SHIFT] to pause the LNN and take manual control.")
    logger.info("Use WASD, Arrows, Space, and Q to steer.")
    logger.info("Release [LEFT SHIFT] to save the recovery memory and resume.")
    logger.info("======================================================")

    episodes = 5
    was_intervening = False
    for i in range(episodes):
        game.new_episode()
        hx = None
        last_known_buffers = {}

        while not game.is_episode_finished():
            # Physiological Reset (Death)
            if game.is_player_dead():
                hx = None

            state = game.get_state()
            if state is None:
                game.advance_action()
                continue

            # 1. Extract EVERYTHING for recording
            extracted_all = SensoryExtractor.get_numpy_state(state, all_sensors)

            # Zero-Order Hold for the recording buffers
            for mod in ['visual', 'depth', 'audio', 'thermal']:
                if mod in extracted_all:
                    last_known_buffers[mod] = extracted_all[mod]
                elif mod in last_known_buffers:
                    extracted_all[mod] = last_known_buffers[mod]

            # 2. Extract ONLY what the brain expects for inference
            # We can use the patched extracted_all dictionary to prevent inference stutter!
            extracted_brain = {k: extracted_all[k] for k in ['visual', 'depth', 'audio', 'thermal'] if getattr(cfg.brain.sensors, k, False)}

            # Wait for warmup
            if len(last_known_buffers) < 4:
                game.advance_action()
                continue

            tensors = SensoryExtractor.to_tensors(extracted_brain, device)
            with torch.no_grad():
                logits, hx = model(
                    tensors.get('visual'), 
                    x_aud=tensors.get('audio'), 
                    x_thm=tensors.get('thermal'), 
                    hx=hx
                )                
                probs = torch.sigmoid(logits)

            if controller.intervening:
                # 1. Fetch the human's corrective action FIRST
                action = controller.get_action_vector()

                if not was_intervening:
                    # 2. Flush the autonomous rolling context into the recovery buffers
                    recovery_frames.extend(auto_frames)
                    recovery_depths.extend(auto_depths)
                    recovery_audios.extend(auto_audios)
                    recovery_thermals.extend(auto_thermals)

                    # 3. RETROACTIVE CORRECTION: Apply the human action to the historical frames
                    recovery_actions.extend([action] * len(auto_frames))

                    was_intervening = True

                # Append current active intervention arrays using 'extracted_all'
                recovery_frames.append(extracted_all['visual'])
                recovery_depths.append(extracted_all['depth'])
                recovery_audios.append(extracted_all['audio'])
                recovery_thermals.append(extracted_all['thermal'])

                recovery_actions.append(action)

                if game.get_episode_time() % 35 == 0:
                    logger.warning(f"OVERRIDE ACTIVE | Recording frame {len(recovery_frames)}...")
            else:
                was_intervening = False
                action_probs = probs.cpu().numpy()[0, 0]
                action = (action_probs > cfg.brain.activation).astype(int).tolist()

                # POPULATE THE AUTONOMOUS ROLLING BUFFERS using 'extracted_all'
                if 'visual' in extracted_all: auto_frames.append(extracted_all['visual'])
                if 'depth' in extracted_all: auto_depths.append(extracted_all['depth'])
                if 'audio' in extracted_all: auto_audios.append(extracted_all['audio'])
                if 'thermal' in extracted_all: auto_thermals.append(extracted_all['thermal'])
                auto_actions.append(action)

                # Save block (Triggers when the user releases the intervention key)
                if len(recovery_frames) > 0:
                    output_dir = Path(resolve_path(cfg.data.dirs["training"])) / active_profile / "recovery"
                    prefix_clean = cfg.data.prefix.rstrip('_')
                    file_prefix = f"{prefix_clean}_{module_name}_recovery"

                    output_path = get_unique_filename(output_dir, file_prefix, "npz")
                    logger.info(f"Saving {len(recovery_frames)} recovery frames to {output_path}...")

                    save_dict = {'frames': np.array(recovery_frames), 'actions': np.array(recovery_actions)}
                    if len(recovery_depths) > 0:
                        save_dict['depths'] = np.array(recovery_depths)
                    if len(recovery_audios) > 0:
                        save_dict['audios'] = np.array(recovery_audios)
                    if len(recovery_thermals) > 0:
                        save_dict['thermals'] = np.array(recovery_thermals)

                    np.savez_compressed(output_path, **save_dict)

                    # Clear memory buffers
                    recovery_frames, recovery_depths, recovery_audios, recovery_thermals, recovery_actions = [], [], [], [], []

            game.make_action(action)
            time.sleep(0.028)

        logger.info(f"Episode {i+1} Finished.")

    game.close()

Procedural Curriculum Pipeline

Bases: ObligeGenerator

An extension of the ObligeGenerator that replaces pure uniform random sampling with Stratified Curriculum Learning and Conditional Priors.

Source code in app/sample/curriculum.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class CurriculumObligeGenerator(ObligeGenerator):
    """
    An extension of the ObligeGenerator that replaces pure uniform random sampling with Stratified Curriculum Learning and Conditional Priors.
    """
    def __init__(self, cfg: RandomizerConfig, phase: int = 1):
        super().__init__(cfg)
        self.phase = phase
        logger.info(f"Initialized Curriculum Generator at Phase {self.phase}")

    def _get_phase_constraints(self) -> Dict[str, list]:
        """
        Defines the parameter space based on the active curriculum phase.
        Phase 1: Simple navigation, minimal temporal complexity.
        Phase 2: Introduction of verticality, traps, and moderate combat.
        Phase 3: The full generalized distribution (complex topology, swarms).
        """
        if self.phase == 1:
            return {
                "size": ["micro", "small"],
                "theme": ["original", "tech"],
                "outdoors": ["none"],
                "steepness": ["none"],
                "liquids": ["none"],
                "teleporters": ["none"],
                "mons": ["none", "sparse"],
                "strength": ["easier", "normal"]
            }
        elif self.phase == 2:
            return {
                "size": ["regular"],
                "theme": ["urban", "hell", "mixed"],
                "outdoors": ["mixed"],
                "steepness": ["mixed"],
                "liquids": ["mixed"],
                "teleporters": ["none"], # Still constrain teleportation to prevent state amnesia
                "mons": ["normal", "lots"],
                "strength": ["normal"]
            }
        else:
            # Phase 3+: Full unbounded distribution defined in app.yaml
            return self.base_oblige_config

    def _apply_conditional_priors(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        Enforces topological coherence by mathematically coupling dependent variables.
        Prevents degenerate combinations that generate conflicting gradients.
        """
        # Constraint 1: Volume vs. Density
        # A micro map cannot physically support a swarm without spawning enemies inside each other.
        if params.get("size") == "micro" and params.get("mons") in ["lots", "swarms"]:
            params["mons"] = "normal"
            logger.debug("Prior Triggered: Reduced monster density for micro-sized geometry.")

        # Constraint 2: Verticality vs. Outdoors
        # 'Epic' steepness in tight indoor corridors breaks ViZDoom's pathing heuristics.
        if params.get("steepness") == "plenty" and params.get("outdoors") == "none":
            params["outdoors"] = "mixed"
            logger.debug("Prior Triggered: Forced outdoor regions to accommodate extreme verticality.")

        # Constraint 3: The Teleporter Amnesia Trap
        # Teleporters instantly shift the visual manifold. If the network is untrained (Phase < 3),
        # this causes the CfC memory state to collapse. Ensure teleporters only exist in large maps.
        if params.get("teleporters") == "plenty" and params.get("size") in ["micro", "small"]:
            params["teleporters"] = "none"

        return params

    def sample_configuration(self) -> Dict[str, Any]:
        """
        Generates a valid, constrained random sample based on the current curriculum.
        """
        phase_space = self._get_phase_constraints()
        sampled_params = {}

        # 1. Stratified Sample within the Phase bounds
        for key, base_values in self.base_oblige_config.items():
            # If the phase defines a tighter bound, use it; otherwise, use the base config
            available_options = phase_space.get(key, base_values)

            # Fallback to base values if the phase constraint is accidentally empty
            if not available_options: 
                available_options = base_values

            sampled_params[key] = random.choice(available_options) if isinstance(available_options, list) else available_options

        # 2. Apply Bayesian constraints to the sampled vector
        return self._apply_conditional_priors(sampled_params)

    def build_map(self, filename: str = "golem_procgen.wad", overrides: dict = None) -> str:
        """
        Overrides the base build_map to inject the curriculum-sampled parameters 
        before compiling the BSP via the container.
        """
        # Generate the structured sample
        structured_params = self.sample_configuration()

        # Allow explicit runtime overrides (e.g., from the CLI) to take final precedence
        if overrides:
            structured_params.update(overrides)

        # Pass the pre-computed dictionary to the parent class, acting as complete overrides
        return super().build_map(filename, overrides=structured_params)

build_map(filename='golem_procgen.wad', overrides=None)

Overrides the base build_map to inject the curriculum-sampled parameters before compiling the BSP via the container.

Source code in app/sample/curriculum.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def build_map(self, filename: str = "golem_procgen.wad", overrides: dict = None) -> str:
    """
    Overrides the base build_map to inject the curriculum-sampled parameters 
    before compiling the BSP via the container.
    """
    # Generate the structured sample
    structured_params = self.sample_configuration()

    # Allow explicit runtime overrides (e.g., from the CLI) to take final precedence
    if overrides:
        structured_params.update(overrides)

    # Pass the pre-computed dictionary to the parent class, acting as complete overrides
    return super().build_map(filename, overrides=structured_params)

sample_configuration()

Generates a valid, constrained random sample based on the current curriculum.

Source code in app/sample/curriculum.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def sample_configuration(self) -> Dict[str, Any]:
    """
    Generates a valid, constrained random sample based on the current curriculum.
    """
    phase_space = self._get_phase_constraints()
    sampled_params = {}

    # 1. Stratified Sample within the Phase bounds
    for key, base_values in self.base_oblige_config.items():
        # If the phase defines a tighter bound, use it; otherwise, use the base config
        available_options = phase_space.get(key, base_values)

        # Fallback to base values if the phase constraint is accidentally empty
        if not available_options: 
            available_options = base_values

        sampled_params[key] = random.choice(available_options) if isinstance(available_options, list) else available_options

    # 2. Apply Bayesian constraints to the sampled vector
    return self._apply_conditional_priors(sampled_params)

Data Inspection & Auditing

audit(cfg, module_name='all', full=False, target_file=None)

Runs a diagnostic brain scan to evaluate the active model's predictive accuracy.

This function performs a forward pass on a subset of the dataset (up to 50 batches) without updating the model weights. It compares the model's action probabilities against the ground-truth human actions and calculates the Precision, Recall, and Support for each action class. This is critical for identifying whether the agent is successfully learning rare actions (like shooting) or if it has fallen into a convergence trap.

Parameters:

Name Type Description Default
cfg GolemConfig

The centralized application configuration object.

required
module_name str

The specific module to audit against (e.g., "combat", "navigation"). If "all", it evaluates against all available data for the active profile. Default: "all".

'all'
full bool

If True, evaluates the entire dataset instead of capping at 50 sequence batches. Default: False

False
target_file str

The specific model file to load.

None
Source code in app/metrics/audit.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
@register_command("audit")
def audit(cfg: GolemConfig, module_name: str = "all", full: bool = False, target_file: str = None):
    r"""
    Runs a diagnostic brain scan to evaluate the active model's predictive accuracy.

    This function performs a forward pass on a subset of the dataset (up to 50 batches) without updating the model weights. It compares the model's action probabilities against the ground-truth human actions and calculates the Precision, Recall, and Support for each action class. This is critical for identifying whether the agent is successfully learning rare actions (like shooting) or if it has fallen into a convergence trap.

    Args:
        cfg (GolemConfig): The centralized application configuration object.
        module_name (str, optional): The specific module to audit against 
            (e.g., "combat", "navigation"). If "all", it evaluates against all 
            available data for the active profile. Default: ``"all"``.
        full (bool, optional): If ``True``, evaluates the entire dataset instead 
            of capping at 50 sequence batches. Default: ``False``
        target_file (str, optional): The specific model file to load.
    """
    device = torch.device('mps') if torch.backends.mps.is_available() else torch.device("cpu")

    # 1. Load Data
    active_profile = cfg.brain.mode
    data_dir = Path(resolve_path(cfg.data.dirs["training"])) / active_profile
    prefix_clean = cfg.data.prefix.rstrip('_')

    if module_name and module_name.lower() == "all":
        file_pattern = f"{prefix_clean}_*.npz"
    else:
        file_pattern = f"{prefix_clean}_{module_name}*.npz"

    dataset = DoomStreamingDataset(
        str(data_dir), 
        seq_len=cfg.training.sequence_length,
        file_pattern=file_pattern,
        augment=cfg.training.augmentation.mirror,
        action_names=cfg.training.action_names,
        dsp_config=cfg.brain.dsp,
        sensors=cfg.brain.sensors
    )
    dataloader = DataLoader(dataset, batch_size=32, shuffle=False)

    # 2. Discover Brain Architecture & Load Model
    model_dir = Path(resolve_path(cfg.data.dirs["model"])) / active_profile

    if target_file:
        active_model_path = model_dir / target_file
        if not active_model_path.exists():
            logger.error(f"Target model file not found: {active_model_path}")
            return
        archives = [active_model_path]
    else:
        active_model_path = Path(resolve_path(cfg.data.dirs["training"])) / active_profile / "golem.pth"
        # Intelligently discover actual parameters from the latest archive
        archives = list(model_dir.glob("*.pth"))

    apply_latest_parameters(cfg, archives)

    try:
        # Load state dict first to intelligently resolve n_actions and avoid tensor mismatches 
        # caused by runtime config overrides not updating the action space size.
        state_dict = torch.load(str(active_model_path), map_location=device, weights_only=True)

        n_actions = cfg.training.action_space_size
        if 'output.weight' in state_dict:
            n_actions = state_dict['output.weight'].shape[0]

        model = DoomLiquidNet(
            n_actions=n_actions,
            cortical_depth=cfg.brain.cortical_depth,
            working_memory=cfg.brain.working_memory,
            sensors=cfg.brain.sensors,
            dsp_config=cfg.brain.dsp
        ).to(device)

        model.load_state_dict(state_dict)
        model.eval()
    except FileNotFoundError:
        logger.error(f"No brain found at {active_model_path}. Train first!")
        return

    # 3. Scan
    logger.info(f"Scanning neural pathways (Module: {module_name})...")

    all_preds = []
    all_targets = []
    max_batches = float('inf') if full else 50

    with torch.no_grad():
        for i, (inputs, actions) in enumerate(dataloader):
            if i >= max_batches: 
                break

            x_vis = inputs['visual'].to(device)
            x_aud = inputs['audio'].to(device) if 'audio' in inputs else None
            x_thm = inputs['thermal'].to(device) if 'thermal' in inputs else None

            logits, _ = model(x_vis, x_aud=x_aud, x_thm=x_thm)
            preds = (torch.sigmoid(logits) > 0.5).float().cpu().numpy()
            targets = actions.cpu().numpy()

            # Use dynamic n_actions rather than potentially stale cfg.training.action_space_size
            all_preds.append(preds.reshape(-1, n_actions))
            all_targets.append(targets.reshape(-1, n_actions))

    if not all_preds:
        logger.error("No data found to audit!")
        return

    y_pred = np.concatenate(all_preds)
    y_true = np.concatenate(all_targets)

    # 4. Report via Jinja2
    exact_acc = accuracy_score(y_true, y_pred)
    action_names = list(cfg.training.action_names)

    # Pad action names if the runtime config didn't dynamically expand
    if len(action_names) < n_actions:
        action_names += [f"ACTION_{i}" for i in range(len(action_names), n_actions)]

    metrics = []

    for i, name in enumerate(action_names):
        if i >= y_true.shape[1]:
            break

        true_col = y_true[:, i]
        pred_col = y_pred[:, i]
        support = int(true_col.sum())

        tp = ((true_col == 1) & (pred_col == 1)).sum()
        fp = ((true_col == 0) & (pred_col == 1)).sum()
        fn = ((true_col == 1) & (pred_col == 0)).sum()

        precision = tp / (tp + fp + 1e-9)
        recall = tp / (tp + fn + 1e-9)

        metrics.append({
            "name": name,
            "precision": precision,
            "recall": recall,
            "support": support
        })

    env = Environment(loader=FileSystemLoader(resolve_path("app/templates")))
    template = env.get_template("audit.j2")

    print(template.render(
        module_name=module_name,
        sample_count=len(y_true),
        exact_acc=exact_acc,
        metrics=metrics
    ))

Analysis Module: Diagnostics and Validation.

This module provides tools for inspecting the integrity of the ETL pipeline's output data and auditing the performance of trained models. It ensures datasets are balanced and normal, and generates precision/recall matrices to evaluate model convergence.

examine(cfg, module_name='all', target_file=None, index=0)

Generates a phenomenological saliency map (Grad-CAM) for a specific sequence.

This evaluates the model's visual and thermal cortices to identify which spatial pixels triggered the agent's highest-probability action prediction.

Parameters:

Name Type Description Default
cfg GolemConfig

Centralized configuration object.

required
module_name str

The specific module dataset to pull a sequence from. Default: "all"

'all'
target_file str

The specific model file to load.

None
index int

The batch index in the dataset to examine. Default: 0

0
Source code in app/metrics/examine.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
@register_command("examine")
def examine(cfg: GolemConfig, module_name: str = "all", target_file: str = None, index: int = 0):
    r"""
    Generates a phenomenological saliency map (Grad-CAM) for a specific sequence.

    This evaluates the model's visual and thermal cortices to identify which spatial 
    pixels triggered the agent's highest-probability action prediction.

    Args:
        cfg (GolemConfig): Centralized configuration object.
        module_name (str, optional): The specific module dataset to pull a sequence from. Default: ``"all"``
        target_file (str, optional): The specific model file to load.
        index (int, optional): The batch index in the dataset to examine. Default: 0
    """
    try:
        import matplotlib.pyplot as plt
        from captum.attr import LayerGradCam, LayerAttribution
    except ImportError:
        logger.error("Captum and matplotlib are required. Run: pip install captum matplotlib")
        return

    device = torch.device('mps') if torch.backends.mps.is_available() else torch.device("cpu")
    active_profile = cfg.brain.mode
    data_dir = Path(resolve_path(cfg.data.dirs["training"])) / active_profile

    # 1. Load the Dataset
    if module_name and module_name.lower() == "all":
        file_pattern = f"{cfg.data.prefix}*.npz"
    else:
        file_pattern = f"{cfg.data.prefix}{module_name}*.npz"

    dataset = DoomStreamingDataset(
        [ str(data_dir) ], 
        seq_len=cfg.training.sequence_length,
        file_pattern=file_pattern,
        augment=False,
        action_names=cfg.training.action_names,
        dsp_config=cfg.brain.dsp,
        sensors=cfg.brain.sensors
    )

    if len(dataset) == 0:
        logger.error("No data found to examine.")
        return

    safe_index = min(max(0, index), len(dataset) - 1)
    logger.info(f"Loading sequence {safe_index} from dataset...")
    inputs, actions = dataset[safe_index]

    # 2. Discover Brain Architecture & Load Model
    model_dir = Path(resolve_path(cfg.data.dirs["model"])) / active_profile

    if target_file:
        active_model_path = model_dir / target_file
        if not active_model_path.exists():
            logger.error(f"Target model file not found: {active_model_path}")
            return
        archives = [active_model_path]
    else:
        active_model_path = Path(resolve_path(cfg.data.dirs["training"])) / active_profile / "golem.pth"
        archives = list(model_dir.glob("*.pth"))

    apply_latest_parameters(cfg, archives)

    try:
        state_dict = torch.load(str(active_model_path), map_location=device, weights_only=True)
        n_actions = cfg.training.action_space_size
        if 'output.weight' in state_dict:
            n_actions = state_dict['output.weight'].shape[0]

        model = DoomLiquidNet(
            n_actions=n_actions,
            cortical_depth=cfg.brain.cortical_depth,
            working_memory=cfg.brain.working_memory,
            sensors=cfg.brain.sensors,
            dsp_config=cfg.brain.dsp
        ).to(device)
        model.load_state_dict(state_dict)
        model.eval()
    except FileNotFoundError:
        logger.error(f"No brain found at {active_model_path}. Train first!")
        return

    # 3. Prepare Tensors
    # Add batch dimension: (1, Seq_Len, C, H, W)
    x_vis = inputs['visual'].unsqueeze(0).to(device)
    x_aud = inputs['audio'].unsqueeze(0).to(device) if 'audio' in inputs else None
    x_thm = inputs['thermal'].unsqueeze(0).to(device) if 'thermal' in inputs else None

    seq_len = x_vis.size(1)

    # 4. Separate history (to build ODE state) from the final prediction frame
    if seq_len > 1:
        x_vis_hist = x_vis[:, :-1, ...]
        x_aud_hist = x_aud[:, :-1, ...] if x_aud is not None else None
        x_thm_hist = x_thm[:, :-1, ...] if x_thm is not None else None

        with torch.no_grad():
            _, hx = model(x_vis_hist, x_aud_hist, x_thm_hist)
    else:
        hx = None

    x_vis_step = x_vis[:, -1:, ...]
    x_aud_step = x_aud[:, -1:, ...] if x_aud is not None else None
    x_thm_step = x_thm[:, -1:, ...] if x_thm is not None else None

    # 5. Captum Wrapper
    wrapper = ModelWrapperStep(model, hx)

    # Find the most probable action to attribute
    with torch.no_grad():
        final_logits = wrapper(x_vis_step, x_aud_step, x_thm_step)
        probs = torch.sigmoid(final_logits)[0]

    target_idx = torch.argmax(probs).item()

    # Resolve the name dynamically in case the loaded weights expanded the action space
    action_names = list(cfg.training.action_names)
    if len(action_names) < n_actions:
        action_names += [f"ACTION_{i}" for i in range(len(action_names), n_actions)]
    target_name = action_names[target_idx]

    logger.info(f"Generating Grad-CAM attributing to highest predicted action: {target_name} ({probs[target_idx]:.2f})")

    # 6. Extract Visual Cortex Heatmap
    vis_layer = get_last_conv(model.conv)
    lgc_vis = LayerGradCam(wrapper, vis_layer)
    attr_vis = lgc_vis.attribute(x_vis_step, target=target_idx, additional_forward_args=(x_aud_step, x_thm_step))
    attr_vis = LayerAttribution.interpolate(attr_vis, (64, 64))

    attr_vis_np = attr_vis.squeeze().cpu().detach().numpy()
    attr_vis_np = np.maximum(attr_vis_np, 0)
    if np.max(attr_vis_np) > 0:
        attr_vis_np /= np.max(attr_vis_np)

    # 7. Extract Thermal Cortex Heatmap (If active)
    attr_thm_np = None
    if model.use_thermal:
        thm_layer = get_last_conv(model.thermal_conv)
        lgc_thm = LayerGradCam(wrapper, thm_layer)
        attr_thm = lgc_thm.attribute(x_vis_step, target=target_idx, additional_forward_args=(x_aud_step, x_thm_step))
        attr_thm = LayerAttribution.interpolate(attr_thm, (64, 64))

        attr_thm_np = attr_thm.squeeze().cpu().detach().numpy()
        attr_thm_np = np.maximum(attr_thm_np, 0)
        if np.max(attr_thm_np) > 0:
            attr_thm_np /= np.max(attr_thm_np)

    # 8. Render Side-by-Side Validation
    img_vis_rgb = x_vis_step[0, 0, :3, ...].permute(1, 2, 0).cpu().numpy()

    cols = 4 if model.use_thermal else 2
    fig, axes = plt.subplots(1, cols, figsize=(4 * cols, 4))
    if cols == 2: axes = [axes[0], axes[1]] # Ensure list formatting

    # Plot Visual
    axes[0].imshow(img_vis_rgb)
    axes[0].set_title("Visual Input (RGB)")
    axes[0].axis('off')

    axes[1].imshow(img_vis_rgb)
    axes[1].imshow(attr_vis_np, cmap='jet', alpha=0.5)
    axes[1].set_title(f"Visual Grad-CAM\nTarget: {target_name}")
    axes[1].axis('off')

    # Plot Thermal
    if model.use_thermal:
        img_thm = x_thm_step[0, 0, 0, ...].cpu().numpy()
        axes[2].imshow(img_thm, cmap='gray')
        axes[2].set_title("Thermal Input (Mask)")
        axes[2].axis('off')

        axes[3].imshow(img_thm, cmap='gray')
        axes[3].imshow(attr_thm_np, cmap='jet', alpha=0.5)
        axes[3].set_title(f"Thermal Grad-CAM\nTarget: {target_name}")
        axes[3].axis('off')

    out_path = Path("examine.png")
    plt.tight_layout()
    plt.savefig(out_path, dpi=150, bbox_inches='tight')
    plt.close(fig)

    logger.info(f"Saliency map saved successfully to: {out_path.absolute()}")

Analysis Module: Diagnostics and Validation.

This module provides tools for inspecting the integrity of the ETL pipeline's output data and auditing the performance of trained models. It ensures datasets are balanced and normal, and generates precision/recall matrices to evaluate model convergence.

inspect(cfg, target_file=None)

Analyzes a training dataset file for shape integrity and class balance.

This function loads a specific .npz recording and validates that the visual frames are properly normalized. It also aggregates the action vectors to report the distribution of actions taken, specifically flagging high "idle time" which can cause the network to converge to inaction due to class imbalance.

Parameters:

Name Type Description Default
cfg GolemConfig

The centralized application configuration object.

required
target_file str

The specific filename to inspect. If None, it automatically loads the most recently generated data file for the currently active profile. Default: None.

None
Source code in app/metrics/inspect.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@register_command("inspect")
def inspect(cfg: GolemConfig, target_file: str = None):
    r"""
    Analyzes a training dataset file for shape integrity and class balance.

    This function loads a specific ``.npz`` recording and validates that the visual frames are properly normalized. It also aggregates the action vectors to report the distribution of actions taken, specifically flagging high "idle time" which can cause the network to converge to inaction due to class imbalance.

    Args:
        cfg (GolemConfig): The centralized application configuration object.
        target_file (str, optional): The specific filename to inspect. If ``None``, 
            it automatically loads the most recently generated data file for the 
            currently active profile. Default: ``None``.
    """
    active_profile = cfg.brain.mode

    if target_file:
        file_path = Path(target_file)
        if not file_path.is_absolute():
             file_path = Path(resolve_path(target_file))
    else:
        data_dir = Path(resolve_path(cfg.data.dirs["training"])) / active_profile

        files = list(data_dir.glob(f"{cfg.data.prefix}_*.npz"))

        if not files:
            logger.error(f"No data files found in {data_dir}")
            return
        file_path = sorted(files, key=lambda f: f.stat().st_mtime, reverse=True)[0]

    if not file_path.exists():
        logger.error(f"File not found: {file_path}")
        return

    logger.info(f"Analyzing: {file_path.name}")

    try:
        data = np.load(str(file_path))
        frames = data['frames']
        actions = data['actions']

        total_frames = len(actions)
        if total_frames == 0:
            logger.warning("Dataset is empty.")
            return

        total_presses = np.sum(actions, axis=0)
        labels = cfg.training.action_names
        action_counts = []

        for i, label in enumerate(labels):
            if i < len(total_presses):
                count = int(total_presses[i])
                pct = count / total_frames
                action_counts.append({"label": label, "count": count, "pct": pct})

        non_action_frames = np.sum(~actions.any(axis=1))
        idle_pct = non_action_frames / total_frames

        # Render Report
        env = Environment(loader=FileSystemLoader(resolve_path("app/templates")))
        template = env.get_template("inspect.j2")

        print(template.render(
            filename=file_path.name,
            frames_shape=frames.shape,
            frames_range=(frames.min(), frames.max()),
            is_normalized=(frames.max() <= 1.0),
            actions_shape=actions.shape,
            total_frames=total_frames,
            action_counts=action_counts,
            idle_count=non_action_frames,
            idle_pct=idle_pct
        ))

    except Exception as e:
        logger.error(f"Failed to inspect data: {e}", exc_info=True)

models(cfg, mode=None)

Lists available model archives.

Parameters:

Name Type Description Default
cfg GolemConfig

The centralized application configuration object.

required
mode str

The specific mode to list models for (e.g., "basic", "fluid"). If None, it evaluates against all available modes. Default: None.

None
Source code in app/metrics/summary.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@register_command("list")
def models(cfg: GolemConfig, mode: str = None):
    r"""
    Lists available model archives.

    Args:
        cfg (GolemConfig): The centralized application configuration object.
        mode (str, optional): The specific mode to list models for (e.g., "basic", "fluid"). 
            If ``None``, it evaluates against all available modes. Default: ``None``.
    """
    base_model_dir = Path(resolve_path(cfg.data.dirs["model"]))

    modes_to_check = [mode] if mode else list(cfg.config.keys())

    logger.info("======================================================")
    logger.info("Available Golem Models")
    logger.info("======================================================")

    found_any = False
    for m in modes_to_check:
        m_dir = base_model_dir / m
        if m_dir.exists() and m_dir.is_dir():
            models = list(m_dir.glob("*.pth"))
            if models:
                found_any = True
                logger.info(f"Mode: {m.upper()}")
                for mod in sorted(models, key=lambda f: f.stat().st_mtime, reverse=True):
                    # Calculate size
                    size_mb = mod.stat().st_size / (1024 * 1024)
                    logger.info(f"  - {mod.name} ({size_mb:.2f} MB)")

    if not found_any:
        logger.info("No models found.")
    logger.info("======================================================")

summary(cfg, module_name=None)

Prints a detailed architectural summary of the active brain configuration.

This function instantiates the LNN based on the current configuration and uses the torchinfo package to perform a dummy forward pass. It displays the exact tensor dimensions at each layer, the parameter counts, and validates that the multi-modal sensor fusion layers are properly scaling and concatenating into the Liquid Core.

Parameters:

Name Type Description Default
cfg GolemConfig

The centralized application configuration object.

required
module_name str

Ignored. Included for CLI compatibility.

None
Source code in app/metrics/summary.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@register_command("summary")
def summary(cfg: GolemConfig, module_name: str = None):
    r"""
    Prints a detailed architectural summary of the active brain configuration.

    This function instantiates the LNN based on the current configuration and uses the `torchinfo` package to perform a dummy forward pass. It displays the exact tensor dimensions at each layer, the parameter counts, and validates that the multi-modal sensor fusion layers are properly scaling and concatenating into the Liquid Core.

    Args:
        cfg (GolemConfig): The centralized application configuration object.
        module_name (str, optional): Ignored. Included for CLI compatibility.
    """
    try:
        import torchinfo
    except ImportError:
        logger.error("torchinfo is required for the summary command. Run: pip install torchinfo")
        return

    device = torch.device('mps') if torch.backends.mps.is_available() else torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

    # 1. Base defaults
    active_profile = cfg.brain.mode
    model_dir = Path(resolve_path(cfg.data.dirs["model"])) / active_profile
    active_model_path = Path(resolve_path(cfg.data.dirs["training"])) / active_profile / "golem.pth"
    n_actions = cfg.training.action_space_size 

    # 2. Discover architecture from archives
    archives = list(model_dir.glob("*.pth"))
    apply_latest_parameters(cfg, archives)

    # 3. Discover action space and load state dict (if it exists)
    if active_model_path.exists():
        try:
            state_dict = torch.load(str(active_model_path), map_location=device, weights_only=True)
            if 'output.weight' in state_dict:
                n_actions = state_dict['output.weight'].shape[0]
        except Exception as e:
            logger.warning(f"Could not load state dict from {active_model_path}: {e}")

    model = DoomLiquidNet(
        n_actions=n_actions,
        cortical_depth=cfg.brain.cortical_depth,
        working_memory=cfg.brain.working_memory,
        sensors=cfg.brain.sensors,
        dsp_config=cfg.brain.dsp
    ).to(device)

    # 4. Construct Multi-Modal Dummy Tensors
    seq_len = cfg.training.sequence_length
    batch_size = 1 

    c_vis = 4 if cfg.brain.sensors.depth else 3
    x_vis = torch.randn(batch_size, seq_len, c_vis, 64, 64).to(device)

    x_aud = None
    if cfg.brain.sensors.audio:
        # Calculate raw audio samples per frame (44100 Hz / 35 FPS = 1260)
        audio_samples_per_frame = int(cfg.brain.dsp.sample_rate / 35)
        # Dummy tensor now represents raw waveforms, not spectrograms
        x_aud = torch.randn(batch_size, seq_len, 2, audio_samples_per_frame).to(device)

    x_thm = None
    if cfg.brain.sensors.thermal:
        x_thm = torch.randn(batch_size, seq_len, 1, 64, 64).to(device)

    # Strip None values to prevent torchinfo memory calculation crashes
    input_dict = {"x_vis": x_vis}
    if x_aud is not None:
        input_dict["x_aud"] = x_aud
    if x_thm is not None:
        input_dict["x_thm"] = x_thm

    logger.info("======================================================")
    logger.info(f"Generating Architectural Summary for Profile: {active_profile.upper()}")
    logger.info(f"Sensors Enabled -> Visual: True | Depth: {cfg.brain.sensors.depth} | Audio: {cfg.brain.sensors.audio} | Thermal: {cfg.brain.sensors.thermal}")
    logger.info("======================================================")

    torchinfo.summary(
        model, 
        input_data=input_dict, 
        col_names=["input_size", "output_size", "num_params", "trainable"],
        depth=3
    )