QRS peak detection#
Detection of cardiac cycles from ECG signals
# setup
import sys
import numpy as np
import scipy.signal as sp
from matplotlib import pyplot as plt
import wfdb
Download records#
Identify and download records in the MIMIC III Waveform Database
# Select the first record
selected_record = '3000063_0013'
database_name = 'mimic3wdb/1.0/30/3000063/'
print("Selected record: {}".format(selected_record))
# load data from this record
start_seconds = 20
no_seconds_to_load = 10
fs = 125
record_data = wfdb.rdrecord(record_name = selected_record, sampfrom = fs*start_seconds, sampto = fs*(start_seconds + no_seconds_to_load), pn_dir = database_name)
print("{} seconds of data loaded from: {}".format(no_seconds_to_load, selected_record))
# Plot the data loaded from this record
title_text = "First record from " + database_name + " (record: " + selected_record + ")"
wfdb.plot_wfdb(record=record_data, title=title_text, time_units='seconds')
Selected record: 3000063_0013
10 seconds of data loaded from: 3000063_0013
Separate records#
Separate ECG, PPG and ABP signals
# ECG, PPG and ABP extraction
h = 0
for i in record_data.sig_name:
if i == 'II':
print('ECG at position ' + str(h))
ecg = record_data.p_signal[:,h]
elif i == 'PLETH':
print('PPG at position ' + str(h))
ppg = record_data.p_signal[:,h]
elif i == 'ABP':
print('ABP at position ' + str(h))
abp = record_data.p_signal[:,h]
else:
print('Other signal (' + i + ') at position ' + str(h))
h = h + 1
t = np.arange(start_seconds,(start_seconds + no_seconds_to_load),1.0/fs)
fig, (ax1,ax2,ax3) = plt.subplots(3, 1, sharex = True, sharey = False)
ax1.plot(t, ecg)
ax1.set(xlabel = '', ylabel = 'ECG')
ax2.plot(t, ppg)
ax2.set(xlabel = '', ylabel = 'PPG')
ax3.plot(t, abp)
ax3.set(xlabel = 'Time [s]', ylabel = 'ABP')
ECG at position 0
ABP at position 1
PPG at position 2
[Text(0.5, 0, 'Time [s]'), Text(0, 0.5, 'ABP')]
# Remove NaNs
x, = np.where(np.isnan(ecg))
if len(x) != 0:
print('NaNs in ECG: ' + str(x))
for i in x:
ecg[i] = ecg[i - 1]
x, = np.where(np.isnan(ppg))
if len(x) != 0:
print('NaNs in PPG: ' + str(x))
for i in x:
ppg[i] = ppg[i - 1]
x, = np.where(np.isnan(abp))
if len(x) != 0:
print('NaNs in ABP: ' + str(x))
for i in x:
abp[i] = abp[i - 1]
Filter data#
# Filter ECG
sos_ecg = sp.butter(10, [0.5, 40], btype = 'bp', analog = False, output = 'sos', fs = fs)
ecg_ff = sp.sosfiltfilt(sos_ecg, ecg)
# Filter PPG
sos_ppg = sp.butter(10, [0.2, 10], btype = 'bp', analog = False, output = 'sos', fs = fs)
ppg_ff = sp.sosfiltfilt(sos_ppg, ppg)
# Filter ABP
sos_abp = sp.butter(10, 10, btype = 'low', analog = False, output = 'sos', fs = fs)
abp_ff = sp.sosfiltfilt(sos_abp, abp)
fig, (ax1,ax2,ax3) = plt.subplots(3, 1, sharex = False, sharey = False)
fig.suptitle('Filtered signals')
ax1.plot(t, ecg, color = 'red')
ax1.plot(t, ecg_ff, color = 'orange')
ax1.set_xlabel('Time [s]')
ax1.set_ylabel('ECG [V]')
ax2.plot(t, ppg, color = 'red')
ax2.plot(t, ppg_ff, color = 'orange')
ax2.set_xlabel('Time [s]')
ax2.set_ylabel('PPG [V]')
ax3.plot(t, abp, color = 'red')
ax3.plot(t, abp_ff, color = 'orange')
ax3.set_xlabel('Time [s]')
ax3.set_ylabel('ABP [V]')
plt.subplots_adjust(left = 0.1,
bottom = 0.1,
right = 0.9,
top = 0.9,
wspace = 0.4,
hspace = 0.4)
#plt.show()
QRS detection#
Define QRS detector functions#
# QRS detection function
def qrs_detect(ecg,fs,w):
"""
Description: QRS peak detection and correction
Inputs: ecg, array with ECG signal [user defined units]
fs, sampling rate of signal [Hz]
w, window length for analysis [s]
Outputs: qrs, positions of R peaks [number of samples]
n_int, number of intervals of length w in the signal
Libraries: NumPy (as np), SciPy (Signal, as sp), Matplotlib (PyPlot, as plt)
Version: 1.0 - June 2022
Developed by: Elisa Mejía-Mejía
City, University of London
"""
# Normalization of ECG
ecg_n = (ecg - np.min(ecg))/(np.max(ecg) - np.min(ecg))
ecg_n = ecg_n - np.mean(ecg_n)
# Peak detection in windows of length w
n_int = np.floor(len(ecg)/(w*fs))
for i in range(int(n_int)):
start = i*fs*w
stop = (i + 1)*fs*w - 1
#print('Start: ' + str(start) + ', stop: ' + str(stop))
aux = ecg_n[range(start,stop)]
locs, amps, nlocs = rpeakdetect(aux,fs,0.2)
locs = locs + start
if i == 0:
qrs = locs
else:
qrs = np.append(qrs,locs)
if n_int*fs*w != len(ecg_n):
start = stop + 1
stop = len(ecg_n)
aux = ecg_n[range(start,stop)]
if len(aux) > 20:
locs, amps, nlocs = rpeakdetect(aux,fs,0.2)
locs = locs + start
qrs = np.append(qrs,locs)
# Peak correction
# (1) Not peaks
med_rr = np.median(np.diff(qrs))
med_amp = np.median(ecg[qrs])
pos_for = 0
pos_rev = 0
for i in range(len(qrs)):
cur_amp = ecg[qrs[i]]
if i == 1: # Peak is at the first position of the signal
next_amp = ecg[qrs[i] + 1]
cond = cur_amp >= next_amp
#print(cur_amp, next_amp, cond)
elif i == len(ecg): # Peak is at the last position of the signal
prev_amp = ecg[qrs[i] - 1]
cond = cur_amp >= prev_amp
#print(cur_amp, prev_amp, cond)
else:
next_amp = ecg[qrs[i] + 1]
prev_amp = ecg[qrs[i] - 1]
cond = cur_amp >= prev_amp and cur_amp >= next_amp
#print(cur_amp, prev_amp, next_amp, cond)
if not(cond):
# Forward search
j = qrs[i] + 1
if i == len(qrs) - 1:
stop = len(ecg)
else:
stop = qrs[i + 1]
while j < stop:
cur_amp = ecg[j]
next_amp = ecg[j + 1]
prev_amp = ecg[j - 1]
if not(cur_amp > next_amp and cur_amp > prev_amp):
j = j + 1
else:
if ecg[j] >= 0.5*med_amp:
pos_for = j
j = stop
else:
j = j + 1
# Reverse search
j = qrs[i] - 1
if i == 0:
stop = 1
else:
stop = qrs[i - 1]
while j > stop:
cur_amp = ecg[j]
next_amp = ecg[j + 1]
prev_amp = ecg[j - 1]
if not(cur_amp > next_amp and cur_amp > prev_amp):
j = j - 1
else:
if ecg[j] >= 0.5*med_amp:
pos_rev = j
j = stop
else:
j = j - 1
# Selection of peak
if pos_for != 0 and pos_rev != 0:
pos = [pos_for, pos_rev]
if i - 1 == 0:
dif = - pos
else:
dif = qrs[i - 1] - pos
dif_med = np.abs(dif - med_rr)
#print(dif_med)
min_dif = np.min(dif_med)
min_dif, = np.where(dif_med == min_dif)
#print(min_dif,pos(min_ind))
qrs[i] = pos(min_ind)
# (2) Low peaks
med_amp = np.median(ecg[qrs])
#print(len(qrs))
result, = np.where(ecg[qrs] < 0.5*med_amp)
if len(result) > 0:
#print(result)
qrs = np.delete(np.array(qrs),result)
#print(len(qrs))
# (3) Too close or too far
rr = np.diff(qrs)
avg_rr = np.mean(rr)
med_rr = np.median(rr)
q3_rr, q1_rr = np.percentile(rr,[75, 25])
iqr_rr = q3_rr - q1_rr
#print(rr, avg_rr, med_rr, q3_rr, q1_rr, iqr_rr)
if (med_rr < 0.5*fs or avg_rr < 0.5*fs) and (iqr_rr > 0.2*fs):
result, = np.where(rr <= med_rr)
qrs = np.delete(qrs, result)
med_rr = np.median(rr)
# (a) Too close
ind, = np.where(rr < 0.5*med_rr)
if len(ind) != 0:
i = 1
while i <= len(ind):
ind = no.sort(np.append(ind, ind[i] + 1))
#print(ind)
i = i + 2
qrs = np.delete(qrs, ind)
# (b) Too far
rr = np.diff(qrs)
ind, = np.where(rr > 1.5*med_rr)
th_amp = 0.5*np.median(ecg[qrs])
#print(ind, th_amp)
if len(ind) != 0:
i = 0
while len(ind) != 0:
#ind = np.round(ind)
#qrs = np.round(qrs)
#print(i, ind[i], len(qrs))
if qrs[ind[i]] == qrs[-1]:
aux = ecg[qrs[ind[i]]:len(ecg)]
else:
aux = ecg[qrs[ind[i]]:qrs[ind[i] + 1]]
locs, _ = sp.find_peaks(aux)
ind_del, = np.where(aux[locs] < th_amp)
locs = np.delete(locs, ind_del)
if len(locs) == 0:
new = med_rr + qrs[ind[i]] - 1
elif len(locs) == 1:
new = locs + qrs[ind[i]] - 1
else:
aux_dif = np.abs(locs - med_rr)
min_dif = np.min(aux_dif)
ind_new, = np.where(aux_dif == min_dif)
new = locs[ind_new] + qrs[ind[i]] - 1
qrs = np.sort(np.append(qrs, new))
rr = np.diff(qrs)
ind, = np.where(rr > 1.5*med_rr)
rr = np.diff(qrs)
ind_del, = np.where(rr < 0.3*fs)
rr = np.delete(rr, ind_del)
#fig = plt.figure()
#plt.plot(ecg, color = 'black')
#plt.scatter(qrs,ecg[qrs],marker = 'o',color = 'red')
return qrs, n_int
# R peak detect
def rpeakdetect(ecg, fs, th):
"""
Description: QRS peak detection based on rpeakdetect2.m by G. Clifford
A batch QRS detector based upon that of Pan, Hamilton and Tompkins:
J. Pan & W. Tompkins. A real-time QRS detection algorithm. IEEE Trans Biomed Eng, vol. BME-32 NO. 3. 1985.
P. Hamilton & W. Tompkins. Quantitative Investigation of QRS Detection Rules Using the MIT/BIH Arrythmia
Database. IEEE Trans Biomed Eng, vol. BME-33, NO. 12.1986.
Inputs: ecg, array with ECG signal [user defined units]
fs, sampling rate of signal [Hz]
th, threshold for peaks in integrated waveform - Default: 0.2
Outputs: locs, positions of R peaks [number of samples]
amps, amplitudes of R peaks [user defined units]
nlocs, number of R peaks found
Libraries: NumPy (as np), SciPy (Signal, as sp), Matplotlib (PyPlot, as plt)
Version: 1.0 - June 2022
Developed by: Elisa Mejía-Mejía
City, University of London
"""
# Create time array
t = np.divide(range(0,len(ecg) - 1), fs)
# Preprocessing:
# (1) Filter data
sos = sp.butter(6, 40, btype = 'low', analog = False, output = 'sos', fs = fs)
ecg_f = sp.sosfiltfilt(sos, ecg)
# (2) Differentiate ECG
ecg_d = np.diff(ecg_f)
# (3) Square ECG
ecg_s = ecg_d*ecg_d
# (4) Integrate data
if fs >= 256:
d = np.ones(int(np.round(7*fs/256)))
else:
d = np.ones(21)
ecg_fir = sp.lfilter(d, 1, ecg_s)
ecg_med = sp.medfilt(ecg_fir, kernel_size = 9)
# Remove filter delay:
delay = np.ceil(len(d)/2)
ecg_med = ecg_med[int(delay):len(ecg_med)]
# Find peaks:
# (1) Find highest bumps in data
start_int = round((len(ecg) - 1)/4)
stop_int = round(3*(len(ecg) - 1)/4)
max_h = np.max(ecg_med[start_int:stop_int])
# (2) Determine left and right boundaries
ecg_med = np.insert(ecg_med,0,0)
ecg_med = np.append(ecg_med,0)
n_left = 0
n_right = 0
for i in range(len(ecg_med)):
if i > 0 and i < len(ecg_med):
if ecg_med[i] > (th*max_h) and ecg_med[i - 1] < (th*max_h): # left boundary
if n_left == 0:
left_bound = i - 1
n_left = 1
else:
left_bound = np.append(left_bound,i - 1)
if ecg_med[i] > (th*max_h) and ecg_med[i + 1] < (th*max_h): # right boundary
if n_right == 0:
right_bound = i - 1
n_right = 1
else:
right_bound = np.append(right_bound,i - 1)
#print(left_bound)
#print(right_bound)
# (4) Look through all possibilities
if left_bound[0] > right_bound[0]:
right_bound = np.delete(right_bound,0)
if left_bound[-1] > right_bound[-1]:
left_bound = np.delete(left_bound,0)
nlocs = 0
for i in range(len(left_bound)):
#print(i)
lb = left_bound[i]
rb, = np.where(np.array(right_bound) > lb)
if len(rb) != 0:
rb = right_bound[rb[0]]
#print(lb, rb)
amp = np.max(ecg[lb:rb])
pos, = np.where(np.array(ecg[lb:rb]) == np.amax(ecg[lb:rb]))
pos = pos[0]
pos = int(pos + lb)
#print(pos, amp)
if nlocs == 0:
locs = pos
amps = ecg[pos]
else:
locs = np.append(locs, pos)
amps = np.append(amps, ecg[pos])
nlocs = nlocs + 1
#print(locs)
#fig = plt.figure()
#plt.plot(ecg)
#plt.plot(ecg_f)
#plt.plot(ecg_d)
#plt.plot(ecg_s)
#plt.plot(ecg_fir)
#plt.plot(ecg_med)
#plt.scatter(locs, amps, marker = 'o')
return locs, amps, nlocs,
# Detect cardiac cycles
qrs, n_int = qrs_detect(ecg_ff,fs,5)
fig = plt.figure()
plt.title('QRS detection')
plt.plot(t, ecg_ff, color = 'black')
plt.scatter(start_seconds + qrs/fs, ecg_ff[qrs], color = 'red', marker = 'o')
<matplotlib.collections.PathCollection at 0x7fb7ad332950>