Does it make sense to use cross-correlation on arrays of timestamps?
Asked Answered
B

2

8

I want to find the offset between two arrays of timestamps. They could represent, let's say, the onset of beeps in two audio tracks.

Note: There may be extra or missing onsets in either track.

I found some information about cross-correlation (e.g. https://dsp.stackexchange.com/questions/736/how-do-i-implement-cross-correlation-to-prove-two-audio-files-are-similar) which looked promising.

I assumed that each audio track is 10 seconds in duration, and represented the beep onsets as the peaks of a "square wave" with a sample rate of 44.1 kHz:

import numpy as np

rfft = np.fft.rfft
irfft = np.fft.irfft

track_1 = np.array([..., 5.2, 5.5, 7.0, ...])
# The onset in track_2 at 8.0 is "extra," it has no
# corresponding onset in track_1
track_2 = np.array([..., 7.2, 7.45, 8.0, 9.0, ...])
frequency = 44100
num_samples = 10 * frequency
wave_1 = np.zeros(num_samples)
wave_1[(track_1 * frequency).astype(int)] = 1
wave_2 = np.zeros(num_samples)
wave_2[(track_2 * frequency).astype(int)] = 1
xcor = irfft(rfft(wave_1) * np.conj(rfft(wave_2)))
offset = xcor.argmax()

This approach isn't particularly fast, but I was able to get fairly consistent results even with quite low frequencies. However... I have no idea if this is a good idea! Is there a better way to find this offset than cross-correlation?

Edit: added note about missing and extra onsets.

Bengali answered 5/10, 2016 at 15:9 Comment(2)
You show track_1 and track_2 as irregularly spaced, then you multiply them by frequency when building wave_1 and wave_2. Is track_1 and track_2 supposed to be the timestamps you're trying to correlate, or are they supposed to be the audio waveforms without the beeps added? Or are they the onset times of the "beep"?Microprint
track_1 and track_2 are onset times of each beep. wave_1 and wave_2 are each, if you like, a summation of Dirac delta functions for the purposes of finding the cross-correlation.Bengali
M
3

If track_1 and track_2 are timestamps of beeps and both catch all of the beeps, then there's no need to build a waveform and do cross-correlation. Just find the average delay between the two arrays of beep timestamps:

import numpy as np

frequency = 44100
num_samples = 10 * frequency
actual_time_delay = 0.020
timestamp_noise = 0.001

# Assumes track_1 and track_2 are timestamps of beeps, with a delay and noise added
track_1 = np.array([1,2,10000])
track_2 = np.array([1,2,10000]) + actual_time_delay*frequency + 
          frequency*np.random.uniform(-timestamp_noise,timestamp_noise,len(track_1))

calculated_time_delay = np.mean(track_2 - track_1) / frequency
print('Calculated time delay (s):',calculated_time_delay)

This yields approximately 0.020 s depending on the random errors introduced and is about as fast as it gets.

EDIT: If extra or missing timestamps need to be handled, then the code could be modified as the following. Essentially, if the error on the timestamp randomness is bounded, then perform statistical "mode" function on the delays between all the values. Anything within the timestamp randomness bound is clustered together and identified, then the original identified delays are then averaged.

import numpy as np
from scipy.stats import mode

frequency = 44100
num_samples = 10 * frequency
actual_time_delay = 0.020

# Assumes track_1 and track_2 are timestamps of beeps, with a delay, noise added,
#   and extra/missing beeps
timestamp_noise = 0.001
timestamp_noise_threshold = 0.002
track_1 = np.array([1,2,5,10000,100000,200000])
track_2 = np.array([1,2,44,10000,30000]) + actual_time_delay*frequency 
track_2 = track_2 + frequency*np.random.uniform(-timestamp_noise,timestamp_noise,len(track_2))

deltas = []
for t1 in track_1:
    for t2 in track_2:
        deltas.append( t2 - t1)
sorted_deltas = np.sort(deltas)/frequency
truncated_deltas = np.array(sorted_deltas/(timestamp_noise_threshold)+0.5, 
dtype=int)*timestamp_noise_threshold

truncated_time_delay = min(mode(truncated_deltas).mode,key=abs)
calculated_time_delay = np.mean(sorted_deltas[truncated_deltas == truncated_time_delay])

print('Calculated time delay (s):',calculated_time_delay)

Obviously, if too many beeps are missing or extra beeps exist then the code begins to fail at some point, but in general it should work well and be way faster than trying to generate a whole waveform and performing correlation.

Microprint answered 12/10, 2016 at 14:16 Comment(4)
I can't examine this closer at the moment but it looks like this won't hold up to other kinds of "noise" in the tracks, like missing values or extra values. Is that right?Bengali
So it needs to be about to handle missing/extra beeps too? You're correct, it won't handle that.Microprint
Yes, sorry. I should have made that clear up front. I'll edit the question.Bengali
I'll run it myself tomorrow to make sure it works, but it looks promising. Thanks very much!Bengali
D
3

Yes it does make sense. This is commonly done in matlab. Here is a link to a similar application:

http://www.mathworks.com/help/signal/ug/cross-correlation-of-delayed-signal-in-noise.html

Several considerations

Cross-correlation is commonly used for times when the signal in question has too much noise. If you don't have any noise to worry about I would use a different method however.

Dialectician answered 7/10, 2016 at 15:34 Comment(4)
Well, my concern is the "square wave" part of it. If the onset array track_1 is [1, 2, 1000000], it will create a much larger input (wave_1) array than [1, 2, 3] even though both have 3 elements.Bengali
"If you don't have any noise to worry about I would use a different method however. " Can you recommend a different method?Bengali
Suppose I had two functions f1() and g() where g() is the known pattern without noise and f1() are your audio samples. I would slide g() over f1() sample-wise and computer there difference h1(s) = integral[f1(s)-g(s-S)ds]. Then I would find all the local maximums. Those are the locations where the feature exist. Do the same thing for h2(s) = integral[f2(s)-g(s-S)ds]. Then shift h2_max() over h1_max() until you get the lowest horizontal difference from your local maximums.Dialectician
I think I see what you mean but I'm not sure. Could you edit your answer with a dummy implementation?Bengali
M
3

If track_1 and track_2 are timestamps of beeps and both catch all of the beeps, then there's no need to build a waveform and do cross-correlation. Just find the average delay between the two arrays of beep timestamps:

import numpy as np

frequency = 44100
num_samples = 10 * frequency
actual_time_delay = 0.020
timestamp_noise = 0.001

# Assumes track_1 and track_2 are timestamps of beeps, with a delay and noise added
track_1 = np.array([1,2,10000])
track_2 = np.array([1,2,10000]) + actual_time_delay*frequency + 
          frequency*np.random.uniform(-timestamp_noise,timestamp_noise,len(track_1))

calculated_time_delay = np.mean(track_2 - track_1) / frequency
print('Calculated time delay (s):',calculated_time_delay)

This yields approximately 0.020 s depending on the random errors introduced and is about as fast as it gets.

EDIT: If extra or missing timestamps need to be handled, then the code could be modified as the following. Essentially, if the error on the timestamp randomness is bounded, then perform statistical "mode" function on the delays between all the values. Anything within the timestamp randomness bound is clustered together and identified, then the original identified delays are then averaged.

import numpy as np
from scipy.stats import mode

frequency = 44100
num_samples = 10 * frequency
actual_time_delay = 0.020

# Assumes track_1 and track_2 are timestamps of beeps, with a delay, noise added,
#   and extra/missing beeps
timestamp_noise = 0.001
timestamp_noise_threshold = 0.002
track_1 = np.array([1,2,5,10000,100000,200000])
track_2 = np.array([1,2,44,10000,30000]) + actual_time_delay*frequency 
track_2 = track_2 + frequency*np.random.uniform(-timestamp_noise,timestamp_noise,len(track_2))

deltas = []
for t1 in track_1:
    for t2 in track_2:
        deltas.append( t2 - t1)
sorted_deltas = np.sort(deltas)/frequency
truncated_deltas = np.array(sorted_deltas/(timestamp_noise_threshold)+0.5, 
dtype=int)*timestamp_noise_threshold

truncated_time_delay = min(mode(truncated_deltas).mode,key=abs)
calculated_time_delay = np.mean(sorted_deltas[truncated_deltas == truncated_time_delay])

print('Calculated time delay (s):',calculated_time_delay)

Obviously, if too many beeps are missing or extra beeps exist then the code begins to fail at some point, but in general it should work well and be way faster than trying to generate a whole waveform and performing correlation.

Microprint answered 12/10, 2016 at 14:16 Comment(4)
I can't examine this closer at the moment but it looks like this won't hold up to other kinds of "noise" in the tracks, like missing values or extra values. Is that right?Bengali
So it needs to be about to handle missing/extra beeps too? You're correct, it won't handle that.Microprint
Yes, sorry. I should have made that clear up front. I'll edit the question.Bengali
I'll run it myself tomorrow to make sure it works, but it looks promising. Thanks very much!Bengali

© 2022 - 2024 — McMap. All rights reserved.