Newer
Older
adaptive-nback / generators / random_block_ga.py
import itertools as it
import random


class GAOptimizedRandomGenerator:
    """Generate even random sequences according to a predefined TL ration (Ralph, 2014)"""

    def __init__(self, choices, trials, tl=2.0, pool_size=100, n=3):
        """Initialize the genetic algorithm optimizer for n-back sequences.
        :param choices:
        :param trials:
        :param tl:
        :param pool_size:
        :param n:
        """
        self.tl, self.trials, self.choices, self.pool_size, self.n = tl, trials, choices, pool_size, n
        self.pool = []
        self.__init_pool(pool_size)

    def generate(self):
        """Generate a sequence of trials based on passed parameters. TL ratio and distribution are expected to be
        close to the desired ones but not exactly the same.
        :return: a sequence of items in "list" format.
        """
        generation = 0
        best_parent = self.__find_best_parents(1)[0]
        while self.cost(best_parent) > 0.1 and generation < 1000:
            generation += 1
            if random.random() > 0.5:
                self.pool = list(map(lambda s: self.mutate(s), self.pool))
            self.pool = self.crossover_all()
            best_parent = self.__find_best_parents(1)[0]
            print(best_parent, 'cost=%f' % self.cost(best_parent))
        return best_parent

    def __append_chunk(self, prefix="", chunk_size=8):
        chunk_generation = 0
        pool = []

    def __init_pool(self, pool_size, chunk_size = 8) -> list:
        """
        Initialize solution pool.
        :param pool_size: Num of initial random solutions
        :return: initial pool of
        """
        print("Initializing the pool...")
        population = it.combinations_with_replacement(self.choices, chunk_size)
        sample = random.sample(list(population), pool_size)
        self.pool = list(map(lambda _: ''.join(_), sample))
        return self.pool

    def __find_best_parents(self, pool: list, count=1) -> list:
        """
        Find best available sequences from the current pool based on the cost function.
        :param count: Number of desired best sequences to be returned. Default is 1.
        :return: A list of most fit sequences.
        """
        sorted_pool = sorted(pool, key=lambda _: self.cost(_))
        return sorted_pool[:count]

    def distribution_cost(self, seq):
        """
        Calculate fitness according to the similarity to the desired uniform distribution.
        :param seq: a string
        :return:
        """
        costs = {c: 0.0 for c in self.choices}
        for c in list(seq):
            costs[c] += 1.0 if costs.__contains__(c) else 0.0

        # TODO instead of normalizing all, only normalize the max value
        costs = {k: abs(1.0 - v*len(self.choices)/self.trials) for k, v in costs.items()}
        return max(list(costs.values()))

    def cost(self, seq):
        """
        Calculate overall fitness (or cost) of a sequence.
        It's a cost function, so we try to minimize this cost.
        :param seq:
        :return:
        """
        # add fitness for uniform distribution of all stimuli
        # TODO merge different cost functions with weights
        return abs(self.calculate_tl_ratio(seq, self.n) - self.tl) + self.__distribution_cost(seq)

    def crossover_all(self):
        """
        Perform random crossover for all pairs.
        :return: new pool
        """
        new_pool = []
        for i in range(int(self.pool_size/2)):
            seq1 = self.pool[i*2]      # change to weighted random
            seq2 = self.pool[i*2 + 1]  # change to weighted random
            new_pool.extend(self.crossover(seq1, seq2))

        return new_pool

    def crossover(self, seq1, seq2):
        """
        Crossover two sequences.
        :param seq1:
        :param seq2:
        :return:
        """
        pos = random.randint(0, self.trials)
        return [seq1[:pos] + seq2[pos:], seq2[:pos] + seq1[pos:]]

    def mutate(self, seq):
        if random.random() > 0.5:
            pos = random.randint(0, len(seq)-1)
            seq_list = list(seq)
            seq_list[pos] = random.choice(self.choices)
            return ''.join(seq_list)
        return seq

    @staticmethod
    def calculate_tl_ratio(seq, n: int):
        """Calculates the T/L ratio in a block of trials."""
        targets = 0.0
        lures = 0.0
        for index in range(n, len(seq)):
            item = seq[index]
            if item == seq[index - n]:
                targets += 1.0
            elif item == seq[index - (n-1)] or item == seq[index - (n+1)]:
                lures += 1.0
        if lures - 0.0 < 0.001:  # avoid division by zero
            lures = 0.001
        return targets/lures


# Demo
if __name__ == '__main__':

    generator = GAOptimizedRandomGenerator(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'], trials=16, n=2)
    sq = generator.generate()
    tl_ratio = generator.calculate_tl_ratio(sq, n=2)
    even_dist = generator.distribution_cost(sq)

    print('GA-Optimized Sequence: %s' % sq, 'with tl_ratio=%f' % tl_ratio, 'and even_dist_cost=%f' % even_dist)