Rate this Page

Source code for helion.autotuner.differential_evolution

from __future__ import annotations

import random
from typing import TYPE_CHECKING

from .base_search import FlatConfig
from .base_search import PopulationBasedSearch
from .base_search import PopulationMember
from .base_search import performance
from .base_search import population_statistics
from .effort_profile import DIFFERENTIAL_EVOLUTION_DEFAULTS
from .pattern_search import InitialPopulationStrategy

if TYPE_CHECKING:
    from collections.abc import Iterator
    from collections.abc import Sequence

    from ..runtime.config import Config
    from ..runtime.kernel import BoundKernel


[docs] class DifferentialEvolutionSearch(PopulationBasedSearch): """ A search strategy that uses differential evolution to find the best config. """
[docs] def __init__( self, kernel: BoundKernel, args: Sequence[object], population_size: int = DIFFERENTIAL_EVOLUTION_DEFAULTS.population_size, max_generations: int = DIFFERENTIAL_EVOLUTION_DEFAULTS.max_generations, crossover_rate: float = 0.8, immediate_update: bool | None = None, min_improvement_delta: float | None = None, patience: int | None = None, initial_population_strategy: InitialPopulationStrategy | None = None, ) -> None: """ Create a DifferentialEvolutionSearch autotuner. Args: kernel: The kernel to be autotuned. args: The arguments to be passed to the kernel. population_size: The size of the population. max_generations: The maximum number of generations to run. crossover_rate: The crossover rate for mutation. immediate_update: Whether to update population immediately after each evaluation. min_improvement_delta: Relative improvement threshold for early stopping. If None (default), early stopping is disabled. patience: Number of generations without improvement before stopping. If None (default), early stopping is disabled. initial_population_strategy: Strategy for generating the initial population. FROM_RANDOM generates a random population. FROM_DEFAULT starts from the default configuration (repeated). Can be overridden by HELION_AUTOTUNER_INITIAL_POPULATION env var (handled in default_autotuner_fn). If None is passed, defaults to FROM_RANDOM. """ super().__init__(kernel, args) if immediate_update is None: immediate_update = not bool(kernel.settings.autotune_precompile) if initial_population_strategy is None: initial_population_strategy = InitialPopulationStrategy.FROM_RANDOM self.initial_population_strategy = initial_population_strategy self.population_size = population_size self.max_generations = max_generations self.crossover_rate = crossover_rate self.immediate_update = immediate_update self.min_improvement_delta = min_improvement_delta self.patience = patience # Early stopping state self.best_perf_history: list[float] = [] self.generations_without_improvement = 0
[docs] def mutate(self, x_index: int) -> FlatConfig: a, b, c, *_ = [ self.population[p] for p in random.sample(range(len(self.population)), 4) if p != x_index ] return self.config_gen.differential_mutation( self.population[x_index].flat_values, a.flat_values, b.flat_values, c.flat_values, self.crossover_rate, )
def _generate_initial_population_flat(self) -> list[FlatConfig]: """ Generate the initial population of flat configurations based on the strategy. Returns: A list of flat configurations for the initial population. """ if self.initial_population_strategy == InitialPopulationStrategy.FROM_DEFAULT: # For FROM_DEFAULT strategy, repeat the default config to fill population default = self.config_gen.default_flat() return [default] * (self.population_size * 2) return self.config_gen.random_population_flat(self.population_size * 2)
[docs] def initial_two_generations(self) -> None: # The initial population is 2x larger so we can throw out the slowest half and give the tuning process a head start self.set_generation(0) initial_population_name = self.initial_population_strategy.name oversized_population = sorted( self.parallel_benchmark_flat( self._generate_initial_population_flat(), ), key=performance, ) self.log( f"Initial population (initial_population={initial_population_name}):", lambda: population_statistics(oversized_population), ) self.population = oversized_population[: self.population_size]
def _benchmark_mutation_batch( self, indices: Sequence[int] ) -> list[PopulationMember]: if not indices: return [] flat_configs = [self.mutate(i) for i in indices] return self.parallel_benchmark_flat(flat_configs)
[docs] def iter_candidates(self) -> Iterator[tuple[int, PopulationMember]]: if self.immediate_update: for i in range(len(self.population)): candidates = self._benchmark_mutation_batch([i]) if not candidates: continue yield i, candidates[0] else: indices = list(range(len(self.population))) candidates = self._benchmark_mutation_batch(indices) yield from zip(indices, candidates, strict=True)
[docs] def evolve_population(self) -> int: replaced = 0 for i, candidate in self.iter_candidates(): if self.compare(candidate, self.population[i]) < 0: self.population[i] = candidate replaced += 1 return replaced
[docs] def check_early_stopping(self) -> bool: """ Check if early stopping criteria are met and update state. This method updates best_perf_history and generations_without_improvement, and returns whether the optimization should stop. Returns: True if optimization should stop early, False otherwise. """ import math # Update history current_best = self.best.perf self.best_perf_history.append(current_best) if self.patience is None or len(self.best_perf_history) <= self.patience: return False # Check improvement over last patience generations past_best = self.best_perf_history[-self.patience - 1] if not ( math.isfinite(current_best) and math.isfinite(past_best) and past_best != 0.0 ): return False relative_improvement = abs(current_best / past_best - 1.0) if ( self.min_improvement_delta is not None and relative_improvement < self.min_improvement_delta ): # No significant improvement self.generations_without_improvement += 1 if self.generations_without_improvement >= self.patience: self.log( f"Early stopping at generation {self._current_generation}: " f"no improvement >{self.min_improvement_delta:.1%} for {self.patience} generations" ) return True return False # Significant improvement - reset counter self.generations_without_improvement = 0 return False
def _autotune(self) -> Config: early_stopping_enabled = ( self.min_improvement_delta is not None and self.patience is not None ) initial_population_name = self.initial_population_strategy.name self.log( lambda: ( f"Starting DifferentialEvolutionSearch with population={self.population_size}, " f"generations={self.max_generations}, crossover_rate={self.crossover_rate}, " f"initial_population={initial_population_name}, " f"early_stopping=(delta={self.min_improvement_delta}, patience={self.patience})" ) ) self.initial_two_generations() # Initialize early stopping tracking if early_stopping_enabled: self.best_perf_history = [self.best.perf] self.generations_without_improvement = 0 for i in range(2, self.max_generations): self.set_generation(i) self.log(f"Generation {i} starting") replaced = self.evolve_population() self.log(f"Generation {i} complete: replaced={replaced}", self.statistics) # Check for convergence (only if early stopping enabled) if early_stopping_enabled and self.check_early_stopping(): break self.rebenchmark_population() return self.best.config