Python Implementation of Viterbi Algorithm
Asked Answered
C

6

43

I'm doing a Python project in which I'd like to use the Viterbi Algorithm. Does anyone know of a complete Python implementation of the Viterbi algorithm? The correctness of the one on Wikipedia seems to be in question on the talk page. Does anyone have a pointer?

Chantel answered 15/3, 2012 at 23:57 Comment(0)
C
11

Hmm I can post mine. Its not pretty though, please let me know if you need clarification. I wrote this relatively recently for specifically part of speech tagging.

class Trellis:
    trell = []
    def __init__(self, hmm, words):
        self.trell = []
        temp = {}
        for label in hmm.labels:
           temp[label] = [0,None]
        for word in words:
            self.trell.append([word,copy.deepcopy(temp)])
        self.fill_in(hmm)

    def fill_in(self,hmm):
        for i in range(len(self.trell)):
            for token in self.trell[i][1]:
                word = self.trell[i][0]
                if i == 0:
                    self.trell[i][1][token][0] = hmm.e(token,word)
                else:
                    max = None
                    guess = None
                    c = None
                    for k in self.trell[i-1][1]:
                        c = self.trell[i-1][1][k][0] + hmm.t(k,token)
                        if max == None or c > max:
                            max = c
                            guess = k
                    max += hmm.e(token,word)
                    self.trell[i][1][token][0] = max
                    self.trell[i][1][token][1] = guess

    def return_max(self):
        tokens = []
        token = None
        for i in range(len(self.trell)-1,-1,-1):
            if token == None:
                max = None
                guess = None
                for k in self.trell[i][1]:
                    if max == None or self.trell[i][1][k][0] > max:
                        max = self.trell[i][1][k][0]
                        token = self.trell[i][1][k][1]
                        guess = k
                tokens.append(guess)
            else:
                tokens.append(token)
                token = self.trell[i][1][token][1]
        tokens.reverse()
        return tokens
Creighton answered 16/3, 2012 at 0:11 Comment(4)
I am a bit confused why this is higher than the NLTK post, is their implementation incorrect? OP did you find my completely undocumented code satisfactory?Creighton
Probably the reason it is more easy to hack around to get adapted to one's needs than the NLTK code.Immoderate
@Creighton What does this function hmm.t(k,token) do? I tried to replicate the code but I could not figure out what hmm.t(k,token) do. Can you provide an example for it?Acromion
@Acromion hmm going back pretty far here, but I am pretty sure that hmm.t(k, token) is the probability of transitioning to token from state k and hmm.e(token, word) is the probability of emitting word given token. Looking at the NLTK code may be helpful as well. Honestly my post is not particularly pretty or readable.Creighton
E
35

Here's mine. Its paraphrased directly from the psuedocode implemenation from wikipedia. It uses numpy for conveince of their ndarray but is otherwise a pure python3 implementation.

import numpy as np

def viterbi(y, A, B, Pi=None):
    """
    Return the MAP estimate of state trajectory of Hidden Markov Model.

    Parameters
    ----------
    y : array (T,)
        Observation state sequence. int dtype.
    A : array (K, K)
        State transition matrix. See HiddenMarkovModel.state_transition  for
        details.
    B : array (K, M)
        Emission matrix. See HiddenMarkovModel.emission for details.
    Pi: optional, (K,)
        Initial state probabilities: Pi[i] is the probability x[0] == i. If
        None, uniform initial distribution is assumed (Pi[:] == 1/K).

    Returns
    -------
    x : array (T,)
        Maximum a posteriori probability estimate of hidden state trajectory,
        conditioned on observation sequence y under the model parameters A, B,
        Pi.
    T1: array (K, T)
        the probability of the most likely path so far
    T2: array (K, T)
        the x_j-1 of the most likely path so far
    """
    # Cardinality of the state space
    K = A.shape[0]
    # Initialize the priors with default (uniform dist) if not given by caller
    Pi = Pi if Pi is not None else np.full(K, 1 / K)
    T = len(y)
    T1 = np.empty((K, T), 'd')
    T2 = np.empty((K, T), 'B')

    # Initilaize the tracking tables from first observation
    T1[:, 0] = Pi * B[:, y[0]]
    T2[:, 0] = 0

    # Iterate throught the observations updating the tracking tables
    for i in range(1, T):
        T1[:, i] = np.max(T1[:, i - 1] * A.T * B[np.newaxis, :, y[i]].T, 1)
        T2[:, i] = np.argmax(T1[:, i - 1] * A.T, 1)

    # Build the output, optimal model trajectory
    x = np.empty(T, 'B')
    x[-1] = np.argmax(T1[:, T - 1])
    for i in reversed(range(1, T)):
        x[i - 1] = T2[x[i], i]

    return x, T1, T2
Epicycle answered 18/3, 2018 at 17:47 Comment(3)
I get errors when running this algorithm on a np.array of 12 values containing 4 possible categoriesSenate
What errors do you get? and how are you trying to call the function?Epicycle
What python packages are these functions HiddenMarkovModel.state_transition? I can't find the package "HiddenMarkovModel" in pipRambling
G
15

I found the following code in the example repository of Artificial Intelligence: A Modern Approach. Is something like this what you're looking for?

def viterbi_segment(text, P):
    """Find the best segmentation of the string of characters, given the
    UnigramTextModel P."""
    # best[i] = best probability for text[0:i]
    # words[i] = best word ending at position i
    n = len(text)
    words = [''] + list(text)
    best = [1.0] + [0.0] * n
    ## Fill in the vectors best, words via dynamic programming
    for i in range(n+1):
        for j in range(0, i):
            w = text[j:i]
            if P[w] * best[i - len(w)] >= best[i]:
                best[i] = P[w] * best[i - len(w)]
                words[i] = w
    ## Now recover the sequence of best words
    sequence = []; i = len(words)-1
    while i > 0:
        sequence[0:0] = [words[i]]
        i = i - len(words[i])
    ## Return sequence of best words and overall probability
    return sequence, best[-1]
Greybeard answered 16/3, 2012 at 0:13 Comment(0)
C
11

Hmm I can post mine. Its not pretty though, please let me know if you need clarification. I wrote this relatively recently for specifically part of speech tagging.

class Trellis:
    trell = []
    def __init__(self, hmm, words):
        self.trell = []
        temp = {}
        for label in hmm.labels:
           temp[label] = [0,None]
        for word in words:
            self.trell.append([word,copy.deepcopy(temp)])
        self.fill_in(hmm)

    def fill_in(self,hmm):
        for i in range(len(self.trell)):
            for token in self.trell[i][1]:
                word = self.trell[i][0]
                if i == 0:
                    self.trell[i][1][token][0] = hmm.e(token,word)
                else:
                    max = None
                    guess = None
                    c = None
                    for k in self.trell[i-1][1]:
                        c = self.trell[i-1][1][k][0] + hmm.t(k,token)
                        if max == None or c > max:
                            max = c
                            guess = k
                    max += hmm.e(token,word)
                    self.trell[i][1][token][0] = max
                    self.trell[i][1][token][1] = guess

    def return_max(self):
        tokens = []
        token = None
        for i in range(len(self.trell)-1,-1,-1):
            if token == None:
                max = None
                guess = None
                for k in self.trell[i][1]:
                    if max == None or self.trell[i][1][k][0] > max:
                        max = self.trell[i][1][k][0]
                        token = self.trell[i][1][k][1]
                        guess = k
                tokens.append(guess)
            else:
                tokens.append(token)
                token = self.trell[i][1][token][1]
        tokens.reverse()
        return tokens
Creighton answered 16/3, 2012 at 0:11 Comment(4)
I am a bit confused why this is higher than the NLTK post, is their implementation incorrect? OP did you find my completely undocumented code satisfactory?Creighton
Probably the reason it is more easy to hack around to get adapted to one's needs than the NLTK code.Immoderate
@Creighton What does this function hmm.t(k,token) do? I tried to replicate the code but I could not figure out what hmm.t(k,token) do. Can you provide an example for it?Acromion
@Acromion hmm going back pretty far here, but I am pretty sure that hmm.t(k, token) is the probability of transitioning to token from state k and hmm.e(token, word) is the probability of emitting word given token. Looking at the NLTK code may be helpful as well. Honestly my post is not particularly pretty or readable.Creighton
T
7

I have just corrected the pseudo implementation of Viterbi in Wikipedia. From the initial (incorrect) version, it took me a while to figure out where I was going wrong but I finally managed it, thanks partly to Kevin Murphy's implementation of the viterbi_path.m in the MatLab HMM toolbox.

In the context of an HMM object with variables as shown:

hmm = HMM()
hmm.priors = np.array([0.5, 0.5]) # pi = prior probs
hmm.transition = np.array([[0.75, 0.25], # A = transition probs. / 2 states
                           [0.32, 0.68]])
hmm.emission = np.array([[0.8, 0.1, 0.1], # B = emission (observation) probs. / 3 obs modes
                         [0.1, 0.2, 0.7]])

The Python function to run Viterbi (best-path) algorithm is below:

def viterbi (self,observations):
    """Return the best path, given an HMM model and a sequence of observations"""
    # A - initialise stuff
    nSamples = len(observations[0])
    nStates = self.transition.shape[0] # number of states
    c = np.zeros(nSamples) #scale factors (necessary to prevent underflow)
    viterbi = np.zeros((nStates,nSamples)) # initialise viterbi table
    psi = np.zeros((nStates,nSamples)) # initialise the best path table
    best_path = np.zeros(nSamples); # this will be your output

    # B- appoint initial values for viterbi and best path (bp) tables - Eq (32a-32b)
    viterbi[:,0] = self.priors.T * self.emission[:,observations(0)]
    c[0] = 1.0/np.sum(viterbi[:,0])
    viterbi[:,0] = c[0] * viterbi[:,0] # apply the scaling factor
    psi[0] = 0;

    # C- Do the iterations for viterbi and psi for time>0 until T
    for t in range(1,nSamples): # loop through time
        for s in range (0,nStates): # loop through the states @(t-1)
            trans_p = viterbi[:,t-1] * self.transition[:,s]
            psi[s,t], viterbi[s,t] = max(enumerate(trans_p), key=operator.itemgetter(1))
            viterbi[s,t] = viterbi[s,t]*self.emission[s,observations(t)]

        c[t] = 1.0/np.sum(viterbi[:,t]) # scaling factor
        viterbi[:,t] = c[t] * viterbi[:,t]

    # D - Back-tracking
    best_path[nSamples-1] =  viterbi[:,nSamples-1].argmax() # last state
    for t in range(nSamples-1,0,-1): # states of (last-1)th to 0th time step
        best_path[t-1] = psi[best_path[t],t]

    return best_path
Twombly answered 16/7, 2013 at 13:26 Comment(3)
Comment by jahrulesoverall posted incorrectly in answer, observations(0) is wrong right? should be observations[0] and observations[t]?Kitchenware
I don't understand how you don't get an error when doing psi[best_path[t],t] since best_path is type float and you can only index with ints?Diseuse
@MikeVella I added : bp = np.zeros(nSamples).astype(int)Consonance
H
5

This is an old question, but none of the other answers were quite what I needed because my application doesn't have specific observed states.

Taking after @Rhubarb, I've also re-implemented Kevin Murphey's Matlab implementation (see viterbi_path.m), but I've kept it closer to the original. I've included a simple test case as well.

import numpy as np


def viterbi_path(prior, transmat, obslik, scaled=True, ret_loglik=False):
    '''Finds the most-probable (Viterbi) path through the HMM state trellis
    Notation:
        Z[t] := Observation at time t
        Q[t] := Hidden state at time t
    Inputs:
        prior: np.array(num_hid)
            prior[i] := Pr(Q[0] == i)
        transmat: np.ndarray((num_hid,num_hid))
            transmat[i,j] := Pr(Q[t+1] == j | Q[t] == i)
        obslik: np.ndarray((num_hid,num_obs))
            obslik[i,t] := Pr(Z[t] | Q[t] == i)
        scaled: bool
            whether or not to normalize the probability trellis along the way
            doing so prevents underflow by repeated multiplications of probabilities
        ret_loglik: bool
            whether or not to return the log-likelihood of the best path
    Outputs:
        path: np.array(num_obs)
            path[t] := Q[t]
    '''
    num_hid = obslik.shape[0] # number of hidden states
    num_obs = obslik.shape[1] # number of observations (not observation *states*)

    # trellis_prob[i,t] := Pr((best sequence of length t-1 goes to state i), Z[1:(t+1)])
    trellis_prob = np.zeros((num_hid,num_obs))
    # trellis_state[i,t] := best predecessor state given that we ended up in state i at t
    trellis_state = np.zeros((num_hid,num_obs), dtype=int) # int because its elements will be used as indicies
    path = np.zeros(num_obs, dtype=int) # int because its elements will be used as indicies

    trellis_prob[:,0] = prior * obslik[:,0] # element-wise mult
    if scaled:
        scale = np.ones(num_obs) # only instantiated if necessary to save memory
        scale[0] = 1.0 / np.sum(trellis_prob[:,0])
        trellis_prob[:,0] *= scale[0]

    trellis_state[:,0] = 0 # arbitrary value since t == 0 has no predecessor
    for t in xrange(1, num_obs):
        for j in xrange(num_hid):
            trans_probs = trellis_prob[:,t-1] * transmat[:,j] # element-wise mult
            trellis_state[j,t] = trans_probs.argmax()
            trellis_prob[j,t] = trans_probs[trellis_state[j,t]] # max of trans_probs
            trellis_prob[j,t] *= obslik[j,t]
        if scaled:
            scale[t] = 1.0 / np.sum(trellis_prob[:,t])
            trellis_prob[:,t] *= scale[t]

    path[-1] = trellis_prob[:,-1].argmax()
    for t in range(num_obs-2, -1, -1):
        path[t] = trellis_state[(path[t+1]), t+1]

    if not ret_loglik:
        return path
    else:
        if scaled:
            loglik = -np.sum(np.log(scale))
        else:
            p = trellis_prob[path[-1],-1]
            loglik = np.log(p)
        return path, loglik


if __name__=='__main__':
    # Assume there are 3 observation states, 2 hidden states, and 5 observations
    priors = np.array([0.5, 0.5])
    transmat = np.array([
        [0.75, 0.25],
        [0.32, 0.68]])
    emmat = np.array([
        [0.8, 0.1, 0.1],
        [0.1, 0.2, 0.7]])
    observations = np.array([0, 1, 2, 1, 0], dtype=int)
    obslik = np.array([emmat[:,z] for z in observations]).T
    print viterbi_path(priors, transmat, obslik)                                #=> [0 1 1 1 0]
    print viterbi_path(priors, transmat, obslik, scaled=False)                  #=> [0 1 1 1 0]
    print viterbi_path(priors, transmat, obslik, ret_loglik=True)               #=> (array([0, 1, 1, 1, 0]), -7.776472586614755)
    print viterbi_path(priors, transmat, obslik, scaled=False, ret_loglik=True) #=> (array([0, 1, 1, 1, 0]), -8.0120386579275227)

Note that this implementation does not use emission probabilities directly but uses a variable obslik. Generally, emissions[i,j] := Pr(observed_state == j | hidden_state == i) for a particular observed state i, making emissions.shape == (num_hidden_states, num_obs_states).

However, given a sequence observations[t] := observation at time t, all the Viterbi Algorithm requires is the likelihood of that observation for each hidden state. Hence, obslik[i,t] := Pr(observations[t] | hidden_state == i). The actual value the of the observed state isn't necessary.

Hazelton answered 6/5, 2017 at 19:49 Comment(3)
I know that this thread is pretty old, bt can you explain what exactly the obslik variable does. Does it tell me the probability distribution of the states for each timestep? If so, in your example , obslik = np.array([emmat[:,z] for z in observations]).T gives : [[0.8 0.1 0.1 0.1 0.8] [0.1 0.2 0.7 0.2 0.1]] Does that mean that for time step 2 I have a probability of 0.1 to be hidden state 1 and a probability of 0.2 to be hidden state 2 ? If so, shouldn't the values in each column add up to 1 ? Thanks!Borgeson
Good question! No, obslik does not work like that (hence lik for "likelihood" instead of prob for "probability"). Instead it works like this: if observations[t] == j, then obslik[t, :] == emmat[:, j]. It's taking column slices of emmat. But emmat sums to 1 across its rows, not its columns. So the columns of obslik don't sum to 1 because the columns of emmat don't sum to 1.Hazelton
And how can I compute obslik ? In my particular case I have for each observation a probability distribution over the hidden states.Borgeson
D
2

I have modified @Rhubarb's answer for the condition where the marginal probabilities are already known (e.g by computing the Forward Backward algorithm).

def viterbi (transition_probabilities, conditional_probabilities):
    # Initialise everything
    num_samples = conditional_probabilities.shape[1]
    num_states = transition_probabilities.shape[0] # number of states

    c = np.zeros(num_samples) #scale factors (necessary to prevent underflow)
    viterbi = np.zeros((num_states,num_samples)) # initialise viterbi table
    best_path_table = np.zeros((num_states,num_samples)) # initialise the best path table
    best_path = np.zeros(num_samples).astype(np.int32) # this will be your output

    # B- appoint initial values for viterbi and best path (bp) tables - Eq (32a-32b)
    viterbi[:,0] = conditional_probabilities[:,0]
    c[0] = 1.0/np.sum(viterbi[:,0])
    viterbi[:,0] = c[0] * viterbi[:,0] # apply the scaling factor

    # C- Do the iterations for viterbi and psi for time>0 until T
    for t in range(1, num_samples): # loop through time
        for s in range (0,num_states): # loop through the states @(t-1)
            trans_p = viterbi[:, t-1] * transition_probabilities[:,s] # transition probs of each state transitioning
            best_path_table[s,t], viterbi[s,t] = max(enumerate(trans_p), key=operator.itemgetter(1))
            viterbi[s,t] = viterbi[s,t] * conditional_probabilities[s][t]

        c[t] = 1.0/np.sum(viterbi[:,t]) # scaling factor
        viterbi[:,t] = c[t] * viterbi[:,t]

    ## D - Back-tracking
    best_path[num_samples-1] =  viterbi[:,num_samples-1].argmax() # last state
    for t in range(num_samples-1,0,-1): # states of (last-1)th to 0th time step
        best_path[t-1] = best_path_table[best_path[t],t]
    return best_path
Diseuse answered 13/11, 2017 at 13:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.