Why NUMPY correlate and corrcoef return different values and how to "normalize" a correlate in "full" mode?
Asked Answered
F

3

22

I'm trying to use some Time Series Analysis in Python, using Numpy.

I have two somewhat medium-sized series, with 20k values each and I want to check the sliding correlation.

The corrcoef gives me as output a Matrix of auto-correlation/correlation coefficients. Nothing useful by itself in my case, as one of the series contains a lag.

The correlate function (in mode="full") returns a 40k elements list that DO look like the kind of result I'm aiming for (the peak value is as far from the center of the list as the Lag would indicate), but the values are all weird - up to 500, when I was expecting something from -1 to 1.

I can't just divide it all by the max value; I know the max correlation isn't 1.

How could I normalize the "cross-correlation" (correlation in "full" mode) so the return values would be the correlation on each lag step instead those very large, strange values?

Freyah answered 12/4, 2011 at 17:33 Comment(0)
T
28

You are looking for normalized cross-correlation. This option isn't available yet in Numpy, but a patch is waiting for review that does just what you want. It shouldn't be too hard to apply it I would think. Most of the patch is just doc string stuff. The only lines of code that it adds are

if normalize:
    a = (a - mean(a)) / (std(a) * len(a))
    v = (v - mean(v)) /  std(v)

where a and v are the inputted numpy arrays of which you are finding the cross-correlation. It shouldn't be hard to either add them into your own distribution of Numpy or just make a copy of the correlate function and add the lines there. I would do the latter personally if I chose to go this route.

Another, quite possibly better, alternative is to just do the normalization to the input vectors before you send it to correlate. It's up to you which way you would like to do it.

By the way, this does appear to be the correct normalization as per the Wikipedia page on cross-correlation except for dividing by len(a) rather than (len(a)-1). I feel that the discrepancy is akin to the standard deviation of the sample vs. sample standard deviation and really won't make much of a difference in my opinion.

Thetos answered 12/4, 2011 at 18:4 Comment(9)
In case anyone's looking for it, the patch (still pending) is now on github.Distinguishing
By the way... "dividing by len(a)" returns 1.0 for autocorrelation (at zero lag) which is correct. Dividing by len(a)-1 returns slightly bigger values in my tests (with gaussian noise).Masticatory
Just for futher reference, the function xcorr in MATLAB does another normalization when using the scaleopt='coeff', i.e. xcorr(a, b, 'coeff') = xcorr(a, b) / (norm(a) * norm(b)). See https://mcmap.net/q/393726/-matlab-xcorr-1d-cross-correlation-normalisation-issue for more detailsMasticatory
@Justin - Why do you normalize a by also dividing by len(a) and do not do that for v?Dupondius
@Dupondius You can do normalize by len(a) in a or in v, but not in both. Also, you could normalize each by the square root of len(a). If you look at the wikipedia page on cross-correlation, there is only one factor of len(a) in front of the equation for Zero-Normalized Cross-Correlation.Thetos
@JustinPeel - Thanks a lot! That makes sense.Dupondius
The Wikipedia page you alluded to says where n is the number of pixels in t(x,y) and f(x,y). What does "and" mean here? Addition? Multiplication?Dermatoglyphics
It means that both numbers are equal to n (f is a subimage).Dispatcher
the above calculation is not correct, since it takes the mean and std of all values a and v. Depending on the input this can lead to significantly wrong correlation coefficient.Selfimprovement
L
0

According to this slides, I would suggest to do it this way:

def cross_correlation(a1, a2):
        lags = range(-len(a1)+1, len(a2))
        cs = []
        for lag in lags:
            idx_lower_a1 = max(lag, 0)
            idx_lower_a2 = max(-lag, 0)
            idx_upper_a1 = min(len(a1), len(a1)+lag)
            idx_upper_a2 = min(len(a2), len(a2)-lag)
            b1 = a1[idx_lower_a1:idx_upper_a1]
            b2 = a2[idx_lower_a2:idx_upper_a2]
            c = np.correlate(b1, b2)[0]
            c = c / np.sqrt((b1**2).sum() * (b2**2).sum())
            cs.append(c)
        return cs
Lacey answered 22/1, 2018 at 10:29 Comment(0)
S
0

For a full mode, would it make sense to compute corrcoef directly on the lagged signal/feature? Code

from dataclasses import dataclass
from typing import Any, Optional, Sequence

import numpy as np

ArrayLike = Any


@dataclass
class XCorr:
    cross_correlation: np.ndarray
    lags: np.ndarray


def cross_correlation(
    signal: ArrayLike, feature: ArrayLike, lags: Optional[Sequence[int]] = None
) -> XCorr:
    """
    Computes normalized cross correlation between the `signal` and the `feature`.
    Current implementation assumes the `feature` can't be longer than the `signal`.
    You can optionally provide specific lags, if not provided `signal` is padded
    with the length of the `feature` - 1, and the `feature` is slid/padded (creating lags)
    with 0 padding to match the length of the new signal. Pearson product-moment
    correlation coefficients is computed for each lag.

    See: https://en.wikipedia.org/wiki/Cross-correlation

    :param signal: observed signal
    :param feature: feature you are looking for
    :param lags: optional lags, if not provided equals to (-len(feature), len(signal))
    """
    signal_ar = np.asarray(signal)
    feature_ar = np.asarray(feature)
    if np.count_nonzero(feature_ar) == 0:
        raise ValueError("Unsupported - feature contains only zeros")
    assert (
        signal_ar.ndim == feature_ar.ndim == 1
    ), "Unsupported - only 1d signal/feature supported"
    assert len(feature_ar) <= len(
        signal
    ), "Unsupported - signal should be at least as long as the feature"
    padding_sz = len(feature_ar) - 1
    padded_signal = np.pad(
        signal_ar, (padding_sz, padding_sz), "constant", constant_values=0
    )
    lags = lags if lags is not None else range(-padding_sz, len(signal_ar), 1)
    if np.max(lags) >= len(signal_ar):
        raise ValueError("max positive lag must be shorter than the signal")
    if np.min(lags) <= -len(feature_ar):
        raise ValueError("max negative lag can't be longer than the feature")
    assert np.max(lags) < len(signal_ar), ""
    lagged_patterns = np.asarray(
        [
            np.pad(
                feature_ar,
                (padding_sz + lag, len(signal_ar) - lag - 1),
                "constant",
                constant_values=0,
            )
            for lag in lags
        ]
    )
    return XCorr(
        cross_correlation=np.corrcoef(padded_signal, lagged_patterns)[0, 1:],
        lags=np.asarray(lags),
    )

Example:

signal = [0, 0, 1, 0.5, 1, 0, 0, 1]
feature = [1, 0, 0, 1]
xcorr = cross_correlation(signal, feature)
assert xcorr.lags[xcorr.cross_correlation.argmax()] == 4
Smarmy answered 18/8, 2021 at 7:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.