Skip to content
Snippets Groups Projects
data_parsing.py 9.39 KiB
Newer Older
DIANE's avatar
DIANE committed
from packages import *
import jcamp as jc

class JcampParser:
DIANE's avatar
DIANE committed
    import jcamp
DIANE's avatar
DIANE committed
    '''This module is designed to help retrieve spectral data as well as metadata of smaples  from jcamp file'''
    def __init__(self, path):
        #self.__path = path.replace('\\','/')
        self.__path = path
        self.__dxfile = jc.jcamp_readfile(self.__path)
        
        # Access samples data
        self.__nb = self.__dxfile['blocks'] # Get the total number of blocks = The total number of scanned samples
        self.__list_of_blocks = self.__dxfile['children']  # Store all blocks within a a list
        self.__wl = self.__list_of_blocks[0]["x"] # Wavelengths/frequencies/range 
    
        # Start retreiving the data
        specs = np.zeros((self.__nb, len(self.__list_of_blocks[0]["y"])), dtype=float) # preallocate a np matrix for sotoring spectra
        self.idx = np.arange(self.__nb) # This list is designed to store samples name
        self.__met = {}
        for i in range(self.__nb): # Loop over the blocks
            specs[i] = self.__list_of_blocks[i]['y']
            block = self.__list_of_blocks[i]
            block_met = {   'name': block['title'],
                            'origin': block['origin'],
                            'date': block['date'],
                            #'time': block['time'],
                            'spectrometer': block['spectrometer/data system'].split('\n$$')[0],
                            'n_scans':block['spectrometer/data system'].split('\n$$')[6].split('=')[1],
                            'resolution': block['spectrometer/data system'].split('\n$$')[8].split('=')[1],
                            #'instrumental parameters': block['instrumental parameters'],
                            'xunits': block['xunits'],
                            'yunits': block['yunits'],
                            #'xfactor': block['xfactor'],
                            #'yfactor': block['yfactor'],
                            'firstx': block['firstx'],
                            'lastx': block['lastx'],
                            #'firsty':block['firsty'],
                            #'miny': block['miny'],
                            #'maxy': block['maxy'],
                            'npoints': block['npoints'],
                            'concentrations':block['concentrations'],
                            #'deltax':block['deltax']
                            }
            
            self.__met[f'{i}'] = block_met
        self.metadata_ = DataFrame(self.__met).T
        self.spectra = DataFrame(np.fliplr(specs), columns= self.__wl[::-1], index = self.metadata_['name']) # Storing spectra in a dataframe



        #### Concentrarions
        self.pattern = r"\(([^,]+),(\d+(\.\d+)?),([^)]+)"
        aa = self.__list_of_blocks[0]['concentrations']
        a = '\n'.join(line for line in aa.split('\n') if "NCU" not in line and "<<undef>>" not in line)
        n_elements = a.count('(')

        ## Get the name of analyzed chamical elements
        elements_name = []
        for match in re.findall(self.pattern, a):
                elements_name.append(match[0])

        ## Retrieve concentrationds
        df = self.metadata_['concentrations']
        cc = {}
        for i in range(self.metadata_.shape[0]):
            cc[df.index[i]] = self.conc(df[str(i)])

        ### dataframe conntaining chemical data
        self.chem_data = DataFrame(cc, index=elements_name).T.astype(float)
        self.chem_data.index = self.metadata_['name']

    ### Method for retrieving the concentration of a single sample
    def conc(self,sample):
        prep = '\n'.join(line for line in sample.split('\n') if "NCU" not in line and "<<undef>>" not in line)
        c = []
        for match in re.findall(self.pattern, prep):
                c.append(match[1])
        concentration = np.array(c)
        return concentration

    @property
    def specs_df_(self):
        return self.spectra
    @property
    def md_df_(self):
        me = self.metadata_.drop("concentrations", axis = 1)
        me = me.drop(me.columns[(me == '').all()], axis = 1)
        return me
    @property
    def md_df_st_(self):
         rt = ['origin','date']
         cl = self.metadata_.loc[:,rt]
         return cl
             
    @property
    def chem_data_(self):
         return self.chem_data
    


class CsvParser:
DIANE's avatar
DIANE committed
    import clevercsv
    import numpy as np


    def __init__(self, file):
        with NamedTemporaryFile(delete = False, suffix = ".dx") as tmp:
                tmp.write(file.read())
                self.file = tmp.name

    
    def parse(self):
        import pandas as pd

        dec_dia = ['.', ',']
        sep_dia = [',', ';']
        dec, sep = [], []
        
        with open(self.file, mode = 'r') as csvfile:
            lines = [csvfile.readline() for i in range(3)]
            for i in lines:
                for j in range(2):
                    dec.append(i.count(dec_dia[j]))
                    sep.append(i.count(sep_dia[j]))
    
        if dec[0] != dec[2]:
            header = 0
        else:
            header = 0


        semi = np.sum([sep[2*i+1] for i in range(3)])
        commas = np.sum([sep[2*i] for i in range(3)])

        if semi>commas:separator = ';'
        elif semi<commas: separator = ','
        
        elif semi ==0 and commas == 0: separator = ';'
        

        commasdec = np.sum([dec[2*i+1] for i in range(1,3)])
        dot = np.sum([dec[2*i] for i in range(1,3)])
        if commasdec>dot:decimal = ','
        elif commasdec<=dot:decimal = '.'
        
        if decimal == separator or len(np.unique(dec)) <= 2:
            decimal = "."
        
        df = pd.read_csv(self.file, decimal=decimal, sep=separator, header=None, index_col=None)
        try:
            rat = np.mean(df.iloc[0,50:60]/df.iloc[5,50:60])>10
            header = 0 if rat or np.nan else None
        except:
            header = 0

        from pandas.api.types import is_float_dtype

        if is_float_dtype(df.iloc[1:,0]):
            index_col = None
        else:
            try:
                te = df.iloc[1:,0].to_numpy().astype(float).dtype
                
            except:
                te = set(df.iloc[1:,0])

            if len(te) == df.shape[0]-1:
                index_col = 0
            elif len(te) < df.shape[0]-1:
                index_col = None
            else:
                index_col = None

        # index_col = 0 if len(set(df.iloc[1:,0])) == df.shape[0]-1 and is_float_dtype(df.iloc[:,0])==False else None
        df = pd.read_csv(self.file, decimal=decimal, sep=separator, header=header, index_col=index_col)
        # st.write(decimal, separator, index_col, header)
        
        if df.select_dtypes(exclude='float').shape[1] >0:
            non_float = df.select_dtypes(exclude='float')
            
        else:
            non_float = pd.DataFrame()


        if df.select_dtypes(include='float').shape[1] >0:
            float_data = df.select_dtypes(include='float')
            
        else:
            float_data = pd.DataFrame()
        return float_data, non_float
            






# ############## new function
# def csv_loader(file):
#     import clevercsv
#     import numpy as np
#     import pandas as pd

#     dec_dia = ['.',',']
#     sep_dia = [',',';']
#     dec, sep = [], []
#     with open(file, mode = 'r') as csvfile:
#         lines = [csvfile.readline() for i in range(3)]
#         for i in lines:
#             for j in range(2):
#                 dec.append(i.count(dec_dia[j]))
#                 sep.append(i.count(sep_dia[j]))
            
#     if dec[0] != dec[2]:
#         header = 0
#     else:
#         header = 0


#     semi = np.sum([sep[2*i+1] for i in range(3)])
#     commas = np.sum([sep[2*i] for i in range(3)])

#     if semi>commas:separator = ';'
#     elif semi<commas: separator = ','
    
#     elif semi ==0 and commas == 0: separator = ';'
    

#     commasdec = np.sum([dec[2*i+1] for i in range(1,3)])
#     dot = np.sum([dec[2*i] for i in range(1,3)])
#     if commasdec>dot:decimal = ','
#     elif commasdec<=dot:decimal = '.'
    
#     if decimal == separator or len(np.unique(dec)) <= 2:
#         decimal = "."
    
#     df = pd.read_csv(file, decimal=decimal, sep=separator, header=None, index_col=None)
#     try:
#         rat = np.mean(df.iloc[0,50:60]/df.iloc[5,50:60])>10
#         header = 0 if rat or np.nan else None
#     except:
#         header = 0

#     from pandas.api.types import is_float_dtype

#     if is_float_dtype(df.iloc[1:,0]):
#         index_col = None
#     else:
#         try:
#             te = df.iloc[1:,0].to_numpy().astype(float).dtype
            
#         except:
#             te = set(df.iloc[1:,0])

#         if len(te) == df.shape[0]-1:
#             index_col = 0
#         elif len(te) < df.shape[0]-1:
#             index_col = None
#         else:
#             index_col = None

#     # index_col = 0 if len(set(df.iloc[1:,0])) == df.shape[0]-1 and is_float_dtype(df.iloc[:,0])==False else None
#     df = pd.read_csv(file, decimal=decimal, sep=separator, header=header, index_col=index_col)
#     # st.write(decimal, separator, index_col, header)
    
#     if df.select_dtypes(exclude='float').shape[1] >0:
#         non_float = df.select_dtypes(exclude='float')
        
#     else:
#         non_float = pd.DataFrame()


#     if df.select_dtypes(include='float').shape[1] >0:
#         float_data = df.select_dtypes(include='float')
        
#     else:
#         float_data = pd.DataFrame()
#     return float_data, non_float