1. CEEMDAN, a complete set of empirical mode decomposition, will not be introduced in this paper. This algorithm is improved by EEMD and EMD, which effectively suppresses the problems of mode aliasing to a certain extent.
-
Add the IMF component with auxiliary noise after EMD decomposition, instead of adding the Gaussian white noise signal directly to the original signal;
-
EEMD decomposition and CEEMD decomposition are the overall average of the modal components obtained after empirical mode decomposition. CEEMDAN decomposition performs the overall average calculation after the first-order IMF component is obtained to obtain the final first-order IMF component, and then repeat the above operations for the residual part, so as to effectively solve the problem of white noise transfer from high frequency to low frequency.
2. Multiscale entropy
The basic principle of multi-scale entropy MSE is to coarsen or down sample the time series, that is, it mainly analyzes the time series under more and more rough time resolution.
def MSE(signal , max_scale): result = [] length = len(signal) std = np.std(signal) for scale in range(1 , max_scale + 1): # Determine the length of the intercept length = int(len(signal) / scale) - 1 # Segment averaging scale_i = signal[ : len(signal) : scale][:length] for i in range(1,scale): scale_i = scale_i + signal[i: len(signal) : scale][:length] scale_i = scale_i / scale #Calculate sample entropy result.append(Sample_Entropy(scale_i, std ,r = 0.15)) print("scale:" , scale, 'SampEn' , result[-1]) return result
Attach the sample entropy code
def Sample_Entropy(x, m, r=0.15): x = np.array(x) # Check whether x is one-dimensional data if x.ndim != 1: raise ValueError("x The dimension of is not one dimension") # Whether the number of rows for calculating x is less than m+1 if len(x) < m+1: raise ValueError("len(x)less than m+1") # Divide x into m windows entropy = 0 for temp in range(2): X = [] for i in range(len(x)-m+1-temp): X.append(x[i:i+m+temp]) X = np.array(X) # Calculate the maximum value of the absolute value of the difference between any row of data in X and the index data corresponding to all rows of data D_value = [] for index1, i in enumerate(X): sub = [] for index2, j in enumerate(X): if index1 != index2: sub.append(max(np.abs(i-j))) D_value.append(sub) # Calculate threshold F = r*np.std(x, ddof=1) # Judgment D_ The ratio of the number of values in each row in value greater than the threshold divided by len(x)-m+1 num = np.sum(D_value>F, axis=1)/(len(X)-m+1-temp) # Calculate the logarithmic average of num Lm = np.average(np.log(num)) entropy = abs(entropy) - Lm return entropy
Note: r you can set it yourself. The specific range is usually 0.1-0.25
3. Reconstruct the CEEMDAN decomposed signal by using the correlation coefficient.
def P(x,y): x1 = np.sum(x) / len(x) y1 = np.sum(y) / len(y) p = np.sum((x-x1)*(y-y1)) / (np.sqrt(np.sum((x-x1)**2))*np.sqrt(np.sum((y-y1)**2))) return p k1 = [] k2 = [] k3 = [] k4 = [] k5 = [] k6 = [] k7 = [] k8 = [] k9 = [] k10 = [] for i in range(400): data = data1[i] ceemdan = CEEMDAN() eIMFs = ceemdan.ceemdan(data) nIMFs = eIMFs.shape[0] Y = [] for j in range(nIMFs): p = P(data,eIMFs[j]) if p > 0.1: Y.append(eIMFs[j]) y_c = np.zeros(2048) for k in range(len(Y)): y_c = y_c+Y[k] print(y_c) mse = MSE(y_c,10) k1.append(mse[0]) k2.append(mse[1]) k3.append(mse[2]) k4.append(mse[3]) k5.append(mse[4]) k6.append(mse[5]) k7.append(mse[6]) k8.append(mse[7]) k9.append(mse[8]) k10.append(mse[9])
P is the calculated correlation coefficient, which is retained when it is greater than 0.1.
Note: this time is to calculate 10 entropy values. I haven't learned how to save multi-dimensional arrays directly into the table, so I saved ten texts and operated them manually. Hahaha, if I learn to share again one day, it won't be so troublesome.
4. The bearing fault diagnosis data of Western Reserve University with experimental data damage of 0.018 and rotating speed of 1797r/min are selected. 100 data are taken for outer ring fault, inner ring fault, rolling element fault and normal state respectively, and the length of each data is 2048 points.
5. Complete code
from PyEMD import CEEMDAN import numpy as np import matplotlib.pyplot as plt import os import glob path = r'E:/CEEMDAN-SSA-SVM/0.018_0hp/' def read_txt(path): cate = [path + x for x in os.listdir(path) if os.path.isdir(path + x)] imgs = [] labels = [] for idx, folder in enumerate(cate): for im in glob.glob(folder + '/*.txt'): img = np.loadtxt(im) imgs.append(img) labels.append(idx) return np.asarray(imgs, np.float32), np.asarray(labels, np.int32) data1, label = read_txt(path) print(label) def Sample_Entropy(x, m, r=0.15): """ Sample entropy m Length of window when sliding r The value range of threshold coefficient is generally 0.1~0.25 """ # Convert x to array x = np.array(x) # Check whether x is one-dimensional data if x.ndim != 1: raise ValueError("x The dimension of is not one dimension") # Whether the number of rows for calculating x is less than m+1 if len(x) < m+1: raise ValueError("len(x)less than m+1") # Divide x into m windows entropy = 0 # Approximate entropy for temp in range(2): X = [] for i in range(len(x)-m+1-temp): X.append(x[i:i+m+temp]) X = np.array(X) # Calculate the maximum value of the absolute value of the difference between any row of data in X and the index data corresponding to all rows of data D_value = [] # Storage difference for index1, i in enumerate(X): sub = [] for index2, j in enumerate(X): if index1 != index2: sub.append(max(np.abs(i-j))) D_value.append(sub) # Calculate threshold F = r*np.std(x, ddof=1) # Judgment D_ The ratio of the number of values in each row in value greater than the threshold divided by len(x)-m+1 num = np.sum(D_value>F, axis=1)/(len(X)-m+1-temp) # Calculate the logarithmic average of num Lm = np.average(np.log(num)) entropy = abs(entropy) - Lm return entropy def MSE(signal , max_scale:int = 20): result = [] length = len(signal) # std = np.std(signal) for scale in range(1 , max_scale + 1): # Determine the length of the intercept length = int(len(signal) / scale) - 1 # Segment averaging scale_i = signal[ : len(signal) : scale][:length] for i in range(1,scale): scale_i = scale_i + signal[i: len(signal) : scale][:length] scale_i = scale_i / scale #Calculate sample entropy result.append(Sample_Entropy(scale_i, 2 ,r = 0.15)) print("scale:" , scale, 'SampEn' , result[-1]) return result def P(x,y): x1 = np.sum(x) / len(x) y1 = np.sum(y) / len(y) p = np.sum((x-x1)*(y-y1)) / (np.sqrt(np.sum((x-x1)**2))*np.sqrt(np.sum((y-y1)**2))) return p k1 = [] k2 = [] k3 = [] k4 = [] k5 = [] k6 = [] k7 = [] k8 = [] k9 = [] k10 = [] for i in range(400): data = data1[i] ceemdan = CEEMDAN() eIMFs = ceemdan.ceemdan(data) nIMFs = eIMFs.shape[0] Y = [] for j in range(nIMFs): p = P(data,eIMFs[j]) if p > 0.1: Y.append(eIMFs[j]) y_c = np.zeros(2048) for k in range(len(Y)): y_c = y_c+Y[k] print(y_c) mse = MSE(y_c,10) k1.append(mse[0]) k2.append(mse[1]) k3.append(mse[2]) k4.append(mse[3]) k5.append(mse[4]) k6.append(mse[5]) k7.append(mse[6]) k8.append(mse[7]) k9.append(mse[8]) k10.append(mse[9]) np.savetxt('E:/CEEMDAN-SSA-SVM/MSE1.txt',k1) np.savetxt('E:/CEEMDAN-SSA-SVM/MSE2.txt',k2) np.savetxt('E:/CEEMDAN-SSA-SVM/MSE3.txt',k3) np.savetxt('E:/CEEMDAN-SSA-SVM/MSE4.txt',k4) np.savetxt('E:/CEEMDAN-SSA-SVM/MSE5.txt',k5) np.savetxt('E:/CEEMDAN-SSA-SVM/MSE6.txt',k6) np.savetxt('E:/CEEMDAN-SSA-SVM/MSE7.txt',k7) np.savetxt('E:/CEEMDAN-SSA-SVM/MSE8.txt',k8) np.savetxt('E:/CEEMDAN-SSA-SVM/MSE9.txt',k9) np.savetxt('E:/CEEMDAN-SSA-SVM/MSE10.txt',k10)
import pandas as pd from sklearn.multiclass import OneVsRestClassifier from sklearn.svm import SVC import numpy as np import pandas from sklearn.model_selection import train_test_split from sklearn.metrics import classification_report, confusion_matrix, recall_score from sklearn.model_selection import GridSearchCV from sklearn.model_selection import cross_val_score import matplotlib.pyplot as plt import matplotlib as mpl import os import glob mpl.rcParams['font.sans-serif'] = ['KaiTi'] # Ensure normal display of Chinese mpl.rcParams['font.serif'] = ['KaiTi'] # Ensure normal display of Chinese mpl.rcParams['axes.unicode_minus'] = False # Ensure that the negative sign is displayed normally import warnings warnings.filterwarnings("ignore") x = [] y = [] data = open('E:/CEEMDAN-SSA-SVM/tezheng.csv',encoding='UTF-8-sig') for line in data.readlines(): lineArr = line.strip().split(',') # data_train.append(lineArr) x.append(lineArr[1:11]) y.append(lineArr[0]) x_train,x_test,y_train,y_test = train_test_split(x,y,test_size=0.3) print(y_test) model2 = SVC(kernel='rbf') clf2 = model2.fit(x_train, y_train) print("rbf Radial basis kernel function-Training set:", model2.score(x_train, y_train)) print("rbf Radial basis kernel function-Test set:", model2.score(x_test, y_test)) print(model2.decision_function(x_train)) # Use predict() to view the prediction results print(model2.predict(x_train)) # Output confusion matrix predictions = model2.predict(x_test) cm = confusion_matrix(y_test, predictions) print(cm) # 5-fold cross validation score_SVM = cross_val_score(model2, x_test, y_test, cv=5) # 50% off cross validation print(score_SVM) print('score mean value', np.mean(score_SVM)) # Mean value of 50% cross validation # Draw confusion matrix print('Prediction label', predictions) print('Real label', y_test) classes = list(map(int, y_test)) # Sort and accurately classify the results classes.sort() # By comparison, the confusion matrix is obtained confusion = confusion_matrix(y_test, predictions) # You can also specify the color behind the gray block_ X reverse color is also OK plt.imshow(confusion, cmap=plt.cm.Blues) # This thing needs attention # ticks this is the coordinate point on the coordinate axis # label this is the annotation description of the coordinate axis indices = range(len(confusion)) # Coordinate position placement # The first is the iteration object, which represents the order of coordinates # The second is the array of values displayed by coordinates. The first one is actually the index of the array of numbers displayed by coordinates, but remember that it must be an iterative object plt.xticks(indices, ['ball', 'inner', 'normal', 'outer']) plt.yticks(indices, ['ball', 'inner', 'normal', 'outer']) # Heat indicator? It's the pregnancy test stick next to it plt.colorbar() # This is the meaning of the coordinate axis plt.xlabel('y_test') plt.ylabel('predictions') plt.title('Confusion matrix') # Display data, more intuitive for first_index in range(len(confusion)): for second_index in range(len(confusion[first_index])): plt.text(first_index, second_index, confusion[first_index][second_index]) # display plt.show()