acoused/Model/GrainSizeTools.py

287 lines
14 KiB
Python

# -*- coding: utf-8 -*-
"""
Created on Thur Jan 10 2019
@author: Adrien Vergne
"""
###############################################################################
# #
# GRAIN SIZE TOOLS #
# #
###############################################################################
# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# Adrien VERGNE - Irstea Lyon - 2019
# Program Python 3.5
# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# This script is designed for loading PSD data from Malvern Mastersizer
# Excek file output, as well as for fitting Gausian mixture models to
# the measured PSD
########################### Loading libraries #########################
import numpy as np # Numpy (array and matrix computing)
import pandas as pd # DataFrames
from math import sqrt, pi
from scipy.interpolate import Akima1DInterpolator # an interpolator
from sklearn.mixture import GaussianMixture # Gaussian mixture model (demodulation)
import matplotlib.pyplot as plt # matplotlib scientific plotting
########################### Loading Malvern PSD data #########################
class load_malvern:
"""This class loads Malvern particle size data from an Excel file"""
def __init__(self, folder, file):
# Excel file
self.file_folder = folder
self.file_name = file
self.file_path = folder + '/' + file
# Data
self.raw_data = None
self.size_inf = None
self.size_sup = None
self.size_center = None
self.size_proba = None
self.D10 = 0
self.D50 = 0
self.D90 = 0
# Loading data
self.load()
def load(self):
self.raw_data = pd.read_excel(self.file_path, 0)
self.size_inf = np.array(self.raw_data['Size x'][1:]) * 1e-6 / 2
self.size_sup = np.array(self.raw_data['Size y'][1:]) * 1e-6 / 2
self.size_center = (self.size_inf + self.size_sup)/2
self.size_proba = np.array(self.raw_data['% In Size'][1:])/100
self.D10 = self.raw_data['µm Diameter'][2]
self.D50 = self.raw_data['µm Diameter'][5]
self.D90 = self.raw_data['µm Diameter'][8]
########################### Fitting GMM #########################
def mix_gaussian_model(x, m_list, s_list, w_list):
"""This function computes a sum of gaussians"""
res = np.zeros(np.shape(x))
for i in range(len(m_list)):
res += w_list[i] * (1 / sqrt(2 * pi * s_list[i]**2) * np.exp(-(x - m_list[i])**2 / (2 * s_list[i]**2)))
return res
def simple_gaussian(x, m, s, w):
"""This function computes a simple guaussian"""
res = w * (1 / sqrt(2 * pi * s**2) * np.exp(-(x - m)**2 / (2 * s**2)))
return res
class demodul_granulo:
"""This class demodulates a grain size distribution
INPUTS:
@size_classes: center of each size class (radius in meters)
@proba_cumul_vol: cumulative volume probability of each size class
@interpolate_min_size: start (i.e. minimum size) of cumulative volume probability interpolation (radius in meters)
@interpolate_max_size: end (i.e. maximum size) of cumulative volume probability interpolation (radius in meters)
"""
def __init__(self, size_classes, proba_cumul_vol, interpolate_min_size, interpolate_max_size):
###### Data containers ######
# ~~~~~~~~ Inputs ~~~~~~~~~~#
# size classes (radius in meters)
self.size_classes = size_classes
# volume cumulative probability (from 0 to 1)
self.proba_cumul_vol = proba_cumul_vol
# start (i.e. minimum size) of cumulative volume probability interpolation
self.interpolate_min_size = interpolate_min_size
# end (i.e. maximum size) of cumulative volume probability interpolation
self.interpolate_max_size = interpolate_max_size
# ~~~~~~~~ Outputs ~~~~~~~~~~#
# Resampled size classes in logscale
self.sizes_log_resampled = np.array([])
# Interpolated cumulative probability
self.cumul_interpolated = np.array([])
# Size class volume probability (no cumul) from resampled size classes
self.resampled_proba_vol = np.array([])
# Size class number probability (no cumul) from resampled size classes
self.resampled_proba_num = np.array([])
# List of Gaussian mixture models (from 1 to 10 modes)
self.gaussian_mixture_model_list = []
# List of outputs data for each model (from 1 to 10 modes) --> see class "model_XX_modes_values"
self.demodul_data_list = []
# Computing
self.interpolate_data()
self.demodulate()
class model_XX_modes_values:
"""This sub-class is a data container for the outputs of a Gaussian Mixture Models of <nb_modes> modes"""
def __init__(self, x, nb_modes, mu_list, sigma_list, w_list):
# Number of modes
self.nb_modes = nb_modes
# List of mode centers (mu)
self.mu_list = mu_list
# List of mode scalling parameters (sigma)
self.sigma_list = sigma_list
# List of mode weights (w)
self.w_list = w_list
# Gaussian Mixture model values
self.model_total = mix_gaussian_model(x, self.mu_list, self.sigma_list, self.w_list) / \
np.sum(mix_gaussian_model(x, self.mu_list, self.sigma_list, self.w_list))
# list of single modes values
self.modes_list = []
for i in range(self.nb_modes):
# Computing the mode values
self.modes_list.append(simple_gaussian(x, self.mu_list[i], self.sigma_list[i], self.w_list[i]) /
np.sum(mix_gaussian_model(x, self.mu_list, self.sigma_list, self.w_list)))
def interpolate_data(self):
"""This method interpolates the cumulative probability in logscale, between the specified min and max size"""
# Phi steps
log_scale = 0.005
# Computing maximum phi (minimum diameter)
log_min = np.log(self.interpolate_min_size)
# Computing minimum phi (maximum diameter)
log_max = np.log(self.interpolate_max_size)
# Creating an array of equally spaced phi
self.sizes_log_resampled = np.arange(log_min, log_max, log_scale)
# Akima interpolation
f_logsize = Akima1DInterpolator(np.log(self.size_classes), self.proba_cumul_vol)
self.cumul_interpolated = f_logsize(self.sizes_log_resampled)
# Computing volume and number probability from interpolated data
self.resampled_proba_vol = np.abs(np.diff(self.cumul_interpolated)) # np.abs is added because probability p
# must be positive in np.random.choice
# used in function below "demodulate"
# Computing number probability,
ss = np.sum(self.resampled_proba_vol /(np.exp(self.sizes_log_resampled[:-1]))**3)
self.resampled_proba_num = (self.resampled_proba_vol / (np.exp(self.sizes_log_resampled[:-1]))**3) / ss
# fig, ax = plt.subplots(1, 2)
# ax[0].plot(list(range(len(self.cumul_interpolated))), self.cumul_interpolated)
# ax[0].set_xlabel("n° point")
# ax[0].set_ylabel("cumul interpolated")
#
# ax[1].plot(list(range(len(self.cumul_interpolated)-1)), self.resampled_proba_vol)
# ax[1].set_xlabel("n° point")
# ax[1].set_ylabel("diff cumul interpolated")
#
# plt.show()
def demodulate(self):
"""This function demodulates the interpolated probability distribution"""
# Sampling the size classes. Need to normalize the probability to get sum = 1.0
# print("**************************")
# for i in range(self.resampled_proba_vol.shape[0]):
# print(f"vol = {self.resampled_proba_vol[i]}, sum = {np.sum(self.resampled_proba_vol)}, "
# f" p = {self.resampled_proba_vol[i] / np.sum(self.resampled_proba_vol)}")
# print("**************************")
ech_demod = np.random.choice(self.sizes_log_resampled[:-1], (10000,1),
p=(self.resampled_proba_vol / np.sum(self.resampled_proba_vol)))
# fitting models from 1 to 10 components
N = np.arange(1, 11)
self.gaussian_mixture_model_list = [None for i in range(len(N))]
for i in range(len(N)):
self.gaussian_mixture_model_list[i] = GaussianMixture(n_components=N[i],
covariance_type='full').fit(ech_demod)
mu_list = [m[0] for m in self.gaussian_mixture_model_list[i].means_]
s_list = [sqrt(s[0][0]) for s in self.gaussian_mixture_model_list[i].covariances_]
w_list = [w for w in self.gaussian_mixture_model_list[i].weights_]
self.demodul_data_list.append(self.model_XX_modes_values(self.sizes_log_resampled, i + 1, mu_list, s_list, w_list))
def plot_interpolation(self):
"""This method plots the cumulative probability interpolation"""
# ------- Plotting interpolation----------- #
fig, ax = plt.subplots(1, 2, figsize = (11,4), dpi=100)
# Data
ax[0].plot(np.log(self.size_classes), self.proba_cumul_vol, linestyle='', marker='+')
ax[1].plot(np.log(self.size_classes), self.proba_cumul_vol, linestyle='', marker='+')
# Interpolation
ax[0].plot(self.sizes_log_resampled, self.cumul_interpolated, color='red')
ax[1].plot(self.sizes_log_resampled, self.cumul_interpolated, color='red')
ax[0].set_xlabel("Log(a)")
ax[0].set_ylabel("Cumulative fraction")
ax[1].set_yscale('log')
ax[1].set_ylim((1e-6, 2))
ax[1].set_xlabel("Log(a)")
ax[1].set_ylabel("Cumulative fraction")
ax[1].set_title('Double log scale')
plt.show()
def print_mode_data(self, n):
"""This method shows the parameters of the Gaussian Mixture Model of n modes"""
if (n <= 0) or (n >10):
print('Error: number of modes should be between 1 and 10')
else:
print('##### Number of modes: %d #####' %n)
model = self.gaussian_mixture_model_list[n - 1]
mode_center_txt = ''
sigma_txt = ''
weights_txt = ''
sorted_indices = []
for i in range(n):
sorted_indices.append(np.where(model.means_ == np.sort(model.means_, axis=0)[i])[0][0])
mode_center_txt = '%s %2.2f um' %(mode_center_txt, np.exp(model.means_[sorted_indices[i]])*1e6)
sigma_txt = '%s %2.2f' %(sigma_txt, sqrt(model.covariances_[sorted_indices[i]][0][0]))
weights_txt = '%s %2.1f %%' %(weights_txt, model.weights_[sorted_indices[i]]*100)
print('Centers (radius):', mode_center_txt)
print('Sigmas:', sigma_txt)
print('Weigths:', weights_txt)
def plot_modes(self, n):
"""This method plots the Gaussian Mixture Model of n components"""
if (n <= 0) or (n >10):
print('Error: number of modes should be between 1 and 10')
else:
# ---------------- Plotting ----------- #
fig,(ax)=plt.subplots(2, 1, figsize = (9,6), dpi=100)
# Data
ax[0].plot(np.exp(self.sizes_log_resampled[0:-1])*1e6, self.resampled_proba_vol,
label='Data - (proba vol.)', ls='--', linewidth=2, color='black')
ax[1].plot(np.exp(self.sizes_log_resampled[0:-1])*1e6, self.resampled_proba_vol,
label='Data - (proba vol.)', ls='--', linewidth=2, color='black')
# Gaussian mixture model
ax[0].plot(np.exp(self.sizes_log_resampled)*1e6,
self.demodul_data_list[n - 1].model_total, color='red', label='Model (%d modes)' %(n))
ax[1].plot(np.exp(self.sizes_log_resampled)*1e6,
self.demodul_data_list[n - 1].model_total, color='red', label='Model (%d modes)' %(n))
# Modes
for i in range(n):
sorted_indice = np.where(self.gaussian_mixture_model_list[n - 1].means_ ==
np.sort(self.gaussian_mixture_model_list[n - 1].means_, axis=0)[i])[0][0]
ax[0].plot(np.exp(self.sizes_log_resampled)*1e6,
self.demodul_data_list[n - 1].modes_list[sorted_indice]/np.sum(self.demodul_data_list[n - 1].model_total),
label='Mode %d' %(i+1), linestyle=':')
ax[1].plot(np.exp(self.sizes_log_resampled)*1e6,
self.demodul_data_list[n - 1].modes_list[sorted_indice]/np.sum(self.demodul_data_list[n - 1].model_total),
label='Mode %d' %(i+1), linestyle=':')
# Axes properties
ax[0].legend()
ax[0].set_xlabel('Radius (um)', fontsize=13, weight='bold')
ax[0].set_ylabel('Volume fraction', fontsize=13, weight='bold')
ax[0].set_title('Gaussian Mixture Model -- %d modes' %(n), fontsize=14, weight='bold')
ax[1].legend(loc=0)
ax[1].set_xlabel('Radius (um)', fontsize=13, weight='bold')
ax[1].set_ylabel('Volume fraction', fontsize=13, weight='bold')
ax[1].set_xscale('log')
fig.tight_layout()
plt.show()