CEEMDAN multiscale entropy (MSE) - SVM fault diagnosis of rolling bearing

1. CEEMDAN, a complete set of empirical mode decomposition, will not be introduced in this paper. This algorithm is improved by EEMD and EMD, which effectively suppresses the problems of mode aliasing to a certain extent.

  1. Add the IMF component with auxiliary noise after EMD decomposition, instead of adding the Gaussian white noise signal directly to the original signal;

  2. EEMD decomposition and CEEMD decomposition are the overall average of the modal components obtained after empirical mode decomposition. CEEMDAN decomposition performs the overall average calculation after the first-order IMF component is obtained to obtain the final first-order IMF component, and then repeat the above operations for the residual part, so as to effectively solve the problem of white noise transfer from high frequency to low frequency.

2. Multiscale entropy

The basic principle of multi-scale entropy MSE is to coarsen or down sample the time series, that is, it mainly analyzes the time series under more and more rough time resolution.

def MSE(signal , max_scale):
    result = []
    length = len(signal)
    std = np.std(signal)

    for scale in range(1 , max_scale + 1):
        # Determine the length of the intercept
        length = int(len(signal) / scale) - 1
        # Segment averaging
        scale_i = signal[ : len(signal) : scale][:length]
        for i in range(1,scale):
            scale_i = scale_i + signal[i: len(signal) : scale][:length]
        scale_i = scale_i / scale
        #Calculate sample entropy
        result.append(Sample_Entropy(scale_i, std ,r = 0.15))
        print("scale:" , scale, 'SampEn' , result[-1])
    return result

Attach the sample entropy code

def Sample_Entropy(x, m, r=0.15):
    
    
    x = np.array(x)
    # Check whether x is one-dimensional data
    if x.ndim != 1:
        raise ValueError("x The dimension of is not one dimension")
    # Whether the number of rows for calculating x is less than m+1
    if len(x) < m+1:
        raise ValueError("len(x)less than m+1")
    # Divide x into m windows
    entropy = 0  
    for temp in range(2):
        X = []
        for i in range(len(x)-m+1-temp):
            X.append(x[i:i+m+temp])
        X = np.array(X)
        # Calculate the maximum value of the absolute value of the difference between any row of data in X and the index data corresponding to all rows of data
        D_value = []  
        for index1, i in enumerate(X):
            sub = []
            for index2, j in enumerate(X):
                if index1 != index2:
                    sub.append(max(np.abs(i-j)))
            D_value.append(sub)
        # Calculate threshold
        F = r*np.std(x, ddof=1)
        # Judgment D_ The ratio of the number of values in each row in value greater than the threshold divided by len(x)-m+1
        num = np.sum(D_value>F, axis=1)/(len(X)-m+1-temp)
        # Calculate the logarithmic average of num
        Lm = np.average(np.log(num))
        entropy = abs(entropy) - Lm

    return entropy

Note: r you can set it yourself. The specific range is usually 0.1-0.25

3. Reconstruct the CEEMDAN decomposed signal by using the correlation coefficient.

def P(x,y):
    x1 = np.sum(x) / len(x)
    y1 = np.sum(y) / len(y)
    p = np.sum((x-x1)*(y-y1)) / (np.sqrt(np.sum((x-x1)**2))*np.sqrt(np.sum((y-y1)**2)))
    return p

k1 = []
k2 = []
k3 = []
k4 = []
k5 = []
k6 = []
k7 = []
k8 = []
k9 = []
k10 = []
for i in range(400):
    data = data1[i]
    ceemdan = CEEMDAN()
    eIMFs = ceemdan.ceemdan(data)
    nIMFs = eIMFs.shape[0]
    Y = []
    for j in range(nIMFs):
        p = P(data,eIMFs[j])
        if p > 0.1:
           Y.append(eIMFs[j])
    y_c = np.zeros(2048)
    for k in range(len(Y)):
        y_c = y_c+Y[k]
    print(y_c)
    mse = MSE(y_c,10)
    k1.append(mse[0])
    k2.append(mse[1])
    k3.append(mse[2])
    k4.append(mse[3])
    k5.append(mse[4])
    k6.append(mse[5])
    k7.append(mse[6])
    k8.append(mse[7])
    k9.append(mse[8])
    k10.append(mse[9])

P is the calculated correlation coefficient, which is retained when it is greater than 0.1.

Note: this time is to calculate 10 entropy values. I haven't learned how to save multi-dimensional arrays directly into the table, so I saved ten texts and operated them manually. Hahaha, if I learn to share again one day, it won't be so troublesome.

4. The bearing fault diagnosis data of Western Reserve University with experimental data damage of 0.018 and rotating speed of 1797r/min are selected. 100 data are taken for outer ring fault, inner ring fault, rolling element fault and normal state respectively, and the length of each data is 2048 points.

5. Complete code

from PyEMD import CEEMDAN
import numpy as np
import matplotlib.pyplot as plt
import os
import glob


path = r'E:/CEEMDAN-SSA-SVM/0.018_0hp/'
def read_txt(path):
    cate = [path + x for x in os.listdir(path) if os.path.isdir(path + x)]
    imgs = []
    labels = []
    for idx, folder in enumerate(cate):
        for im in glob.glob(folder + '/*.txt'):
            img = np.loadtxt(im)
            imgs.append(img)
            labels.append(idx)
    return np.asarray(imgs, np.float32), np.asarray(labels, np.int32)


data1, label = read_txt(path)
print(label)  


def Sample_Entropy(x, m, r=0.15):
    """
    Sample entropy
    m Length of window when sliding
    r The value range of threshold coefficient is generally 0.1~0.25
    """
    # Convert x to array
    x = np.array(x)
    # Check whether x is one-dimensional data
    if x.ndim != 1:
        raise ValueError("x The dimension of is not one dimension")
    # Whether the number of rows for calculating x is less than m+1
    if len(x) < m+1:
        raise ValueError("len(x)less than m+1")
    # Divide x into m windows
    entropy = 0  # Approximate entropy
    for temp in range(2):
        X = []
        for i in range(len(x)-m+1-temp):
            X.append(x[i:i+m+temp])
        X = np.array(X)
        # Calculate the maximum value of the absolute value of the difference between any row of data in X and the index data corresponding to all rows of data
        D_value = []  # Storage difference
        for index1, i in enumerate(X):
            sub = []
            for index2, j in enumerate(X):
                if index1 != index2:
                    sub.append(max(np.abs(i-j)))
            D_value.append(sub)
        # Calculate threshold
        F = r*np.std(x, ddof=1)
        # Judgment D_ The ratio of the number of values in each row in value greater than the threshold divided by len(x)-m+1
        num = np.sum(D_value>F, axis=1)/(len(X)-m+1-temp)
        # Calculate the logarithmic average of num
        Lm = np.average(np.log(num))
        entropy = abs(entropy) - Lm

    return entropy


def MSE(signal , max_scale:int = 20):
    result = []
    length = len(signal)
    # std = np.std(signal)

    for scale in range(1 , max_scale + 1):
        # Determine the length of the intercept
        length = int(len(signal) / scale) - 1
        # Segment averaging
        scale_i = signal[ : len(signal) : scale][:length]
        for i in range(1,scale):
            scale_i = scale_i + signal[i: len(signal) : scale][:length]
        scale_i = scale_i / scale
        #Calculate sample entropy
        result.append(Sample_Entropy(scale_i, 2 ,r = 0.15))
        print("scale:" , scale, 'SampEn' , result[-1])
    return result


def P(x,y):
    x1 = np.sum(x) / len(x)
    y1 = np.sum(y) / len(y)
    p = np.sum((x-x1)*(y-y1)) / (np.sqrt(np.sum((x-x1)**2))*np.sqrt(np.sum((y-y1)**2)))
    return p

k1 = []
k2 = []
k3 = []
k4 = []
k5 = []
k6 = []
k7 = []
k8 = []
k9 = []
k10 = []
for i in range(400):
    data = data1[i]
    ceemdan = CEEMDAN()
    eIMFs = ceemdan.ceemdan(data)
    nIMFs = eIMFs.shape[0]
    Y = []
    for j in range(nIMFs):
        p = P(data,eIMFs[j])
        if p > 0.1:
           Y.append(eIMFs[j])
    y_c = np.zeros(2048)
    for k in range(len(Y)):
        y_c = y_c+Y[k]
    print(y_c)
    mse = MSE(y_c,10)
    k1.append(mse[0])
    k2.append(mse[1])
    k3.append(mse[2])
    k4.append(mse[3])
    k5.append(mse[4])
    k6.append(mse[5])
    k7.append(mse[6])
    k8.append(mse[7])
    k9.append(mse[8])
    k10.append(mse[9])

np.savetxt('E:/CEEMDAN-SSA-SVM/MSE1.txt',k1)
np.savetxt('E:/CEEMDAN-SSA-SVM/MSE2.txt',k2)
np.savetxt('E:/CEEMDAN-SSA-SVM/MSE3.txt',k3)
np.savetxt('E:/CEEMDAN-SSA-SVM/MSE4.txt',k4)
np.savetxt('E:/CEEMDAN-SSA-SVM/MSE5.txt',k5)
np.savetxt('E:/CEEMDAN-SSA-SVM/MSE6.txt',k6)
np.savetxt('E:/CEEMDAN-SSA-SVM/MSE7.txt',k7)
np.savetxt('E:/CEEMDAN-SSA-SVM/MSE8.txt',k8)
np.savetxt('E:/CEEMDAN-SSA-SVM/MSE9.txt',k9)
np.savetxt('E:/CEEMDAN-SSA-SVM/MSE10.txt',k10)
import pandas as pd
from sklearn.multiclass import OneVsRestClassifier
from sklearn.svm import SVC
import numpy as np
import pandas
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, recall_score
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import cross_val_score
import matplotlib.pyplot as plt
import matplotlib as mpl
import os
import glob
mpl.rcParams['font.sans-serif'] = ['KaiTi']   # Ensure normal display of Chinese
mpl.rcParams['font.serif'] = ['KaiTi']        # Ensure normal display of Chinese
mpl.rcParams['axes.unicode_minus'] = False    # Ensure that the negative sign is displayed normally
import warnings
warnings.filterwarnings("ignore")

x = []
y = []
data = open('E:/CEEMDAN-SSA-SVM/tezheng.csv',encoding='UTF-8-sig')
for line in data.readlines():
    lineArr = line.strip().split(',')
    # data_train.append(lineArr)
    x.append(lineArr[1:11])
    y.append(lineArr[0])
x_train,x_test,y_train,y_test = train_test_split(x,y,test_size=0.3)
print(y_test)


model2 = SVC(kernel='rbf')
clf2 = model2.fit(x_train, y_train)

print("rbf Radial basis kernel function-Training set:", model2.score(x_train, y_train))
print("rbf Radial basis kernel function-Test set:", model2.score(x_test, y_test))

print(model2.decision_function(x_train))
# Use predict() to view the prediction results
print(model2.predict(x_train))


# Output confusion matrix
predictions = model2.predict(x_test)
cm = confusion_matrix(y_test, predictions)
print(cm)

# 5-fold cross validation
score_SVM = cross_val_score(model2, x_test, y_test, cv=5)  # 50% off cross validation
print(score_SVM)
print('score mean value', np.mean(score_SVM))  # Mean value of 50% cross validation


# Draw confusion matrix
print('Prediction label', predictions)
print('Real label', y_test)
classes = list(map(int, y_test))
# Sort and accurately classify the results
classes.sort()
# By comparison, the confusion matrix is obtained
confusion = confusion_matrix(y_test, predictions)
# You can also specify the color behind the gray block_ X reverse color is also OK
plt.imshow(confusion, cmap=plt.cm.Blues)
# This thing needs attention
# ticks this is the coordinate point on the coordinate axis
# label this is the annotation description of the coordinate axis
indices = range(len(confusion))
# Coordinate position placement
# The first is the iteration object, which represents the order of coordinates
# The second is the array of values displayed by coordinates. The first one is actually the index of the array of numbers displayed by coordinates, but remember that it must be an iterative object
plt.xticks(indices, ['ball', 'inner', 'normal', 'outer'])
plt.yticks(indices, ['ball', 'inner', 'normal', 'outer'])
# Heat indicator? It's the pregnancy test stick next to it
plt.colorbar()
# This is the meaning of the coordinate axis
plt.xlabel('y_test')
plt.ylabel('predictions')
plt.title('Confusion matrix')
# Display data, more intuitive
for first_index in range(len(confusion)):
    for second_index in range(len(confusion[first_index])):
        plt.text(first_index, second_index, confusion[first_index][second_index])

# display
plt.show()

 

Tags: Python Machine Learning Data Analysis svm

Posted by jdorma0 on Tue, 19 Apr 2022 09:30:14 +0930