Skip to content
Snippets Groups Projects
data_parsing.py 9.39 KiB
Newer Older
  • Learn to ignore specific revisions
  • DIANE's avatar
    DIANE committed
    from packages import *
    import jcamp as jc
    
    class JcampParser:
    
    DIANE's avatar
    DIANE committed
        import jcamp
    
    DIANE's avatar
    DIANE committed
        '''This module is designed to help retrieve spectral data as well as metadata of smaples  from jcamp file'''
        def __init__(self, path):
            #self.__path = path.replace('\\','/')
            self.__path = path
            self.__dxfile = jc.jcamp_readfile(self.__path)
            
            # Access samples data
            self.__nb = self.__dxfile['blocks'] # Get the total number of blocks = The total number of scanned samples
            self.__list_of_blocks = self.__dxfile['children']  # Store all blocks within a a list
            self.__wl = self.__list_of_blocks[0]["x"] # Wavelengths/frequencies/range 
        
            # Start retreiving the data
            specs = np.zeros((self.__nb, len(self.__list_of_blocks[0]["y"])), dtype=float) # preallocate a np matrix for sotoring spectra
            self.idx = np.arange(self.__nb) # This list is designed to store samples name
            self.__met = {}
            for i in range(self.__nb): # Loop over the blocks
                specs[i] = self.__list_of_blocks[i]['y']
                block = self.__list_of_blocks[i]
                block_met = {   'name': block['title'],
                                'origin': block['origin'],
                                'date': block['date'],
                                #'time': block['time'],
                                'spectrometer': block['spectrometer/data system'].split('\n$$')[0],
                                'n_scans':block['spectrometer/data system'].split('\n$$')[6].split('=')[1],
                                'resolution': block['spectrometer/data system'].split('\n$$')[8].split('=')[1],
                                #'instrumental parameters': block['instrumental parameters'],
                                'xunits': block['xunits'],
                                'yunits': block['yunits'],
                                #'xfactor': block['xfactor'],
                                #'yfactor': block['yfactor'],
                                'firstx': block['firstx'],
                                'lastx': block['lastx'],
                                #'firsty':block['firsty'],
                                #'miny': block['miny'],
                                #'maxy': block['maxy'],
                                'npoints': block['npoints'],
                                'concentrations':block['concentrations'],
                                #'deltax':block['deltax']
                                }
                
                self.__met[f'{i}'] = block_met
            self.metadata_ = DataFrame(self.__met).T
            self.spectra = DataFrame(np.fliplr(specs), columns= self.__wl[::-1], index = self.metadata_['name']) # Storing spectra in a dataframe
    
    
    
            #### Concentrarions
            self.pattern = r"\(([^,]+),(\d+(\.\d+)?),([^)]+)"
            aa = self.__list_of_blocks[0]['concentrations']
            a = '\n'.join(line for line in aa.split('\n') if "NCU" not in line and "<<undef>>" not in line)
            n_elements = a.count('(')
    
            ## Get the name of analyzed chamical elements
            elements_name = []
            for match in re.findall(self.pattern, a):
                    elements_name.append(match[0])
    
            ## Retrieve concentrationds
            df = self.metadata_['concentrations']
            cc = {}
            for i in range(self.metadata_.shape[0]):
                cc[df.index[i]] = self.conc(df[str(i)])
    
            ### dataframe conntaining chemical data
            self.chem_data = DataFrame(cc, index=elements_name).T.astype(float)
            self.chem_data.index = self.metadata_['name']
    
        ### Method for retrieving the concentration of a single sample
        def conc(self,sample):
            prep = '\n'.join(line for line in sample.split('\n') if "NCU" not in line and "<<undef>>" not in line)
            c = []
            for match in re.findall(self.pattern, prep):
                    c.append(match[1])
            concentration = np.array(c)
            return concentration
    
        @property
        def specs_df_(self):
            return self.spectra
        @property
        def md_df_(self):
            me = self.metadata_.drop("concentrations", axis = 1)
            me = me.drop(me.columns[(me == '').all()], axis = 1)
            return me
        @property
        def md_df_st_(self):
             rt = ['origin','date']
             cl = self.metadata_.loc[:,rt]
             return cl
                 
        @property
        def chem_data_(self):
             return self.chem_data
        
    
    
    class CsvParser:
    
    DIANE's avatar
    DIANE committed
        import clevercsv
        import numpy as np
    
    
        def __init__(self, file):
            with NamedTemporaryFile(delete = False, suffix = ".dx") as tmp:
                    tmp.write(file.read())
                    self.file = tmp.name
    
        
        def parse(self):
            import pandas as pd
    
            dec_dia = ['.', ',']
            sep_dia = [',', ';']
            dec, sep = [], []
            
            with open(self.file, mode = 'r') as csvfile:
                lines = [csvfile.readline() for i in range(3)]
                for i in lines:
                    for j in range(2):
                        dec.append(i.count(dec_dia[j]))
                        sep.append(i.count(sep_dia[j]))
        
            if dec[0] != dec[2]:
                header = 0
            else:
                header = 0
    
    
            semi = np.sum([sep[2*i+1] for i in range(3)])
            commas = np.sum([sep[2*i] for i in range(3)])
    
            if semi>commas:separator = ';'
            elif semi<commas: separator = ','
            
            elif semi ==0 and commas == 0: separator = ';'
            
    
            commasdec = np.sum([dec[2*i+1] for i in range(1,3)])
            dot = np.sum([dec[2*i] for i in range(1,3)])
            if commasdec>dot:decimal = ','
            elif commasdec<=dot:decimal = '.'
            
            if decimal == separator or len(np.unique(dec)) <= 2:
                decimal = "."
            
            df = pd.read_csv(self.file, decimal=decimal, sep=separator, header=None, index_col=None)
            try:
                rat = np.mean(df.iloc[0,50:60]/df.iloc[5,50:60])>10
                header = 0 if rat or np.nan else None
            except:
                header = 0
    
            from pandas.api.types import is_float_dtype
    
            if is_float_dtype(df.iloc[1:,0]):
                index_col = None
            else:
                try:
                    te = df.iloc[1:,0].to_numpy().astype(float).dtype
                    
                except:
                    te = set(df.iloc[1:,0])
    
                if len(te) == df.shape[0]-1:
                    index_col = 0
                elif len(te) < df.shape[0]-1:
                    index_col = None
                else:
                    index_col = None
    
            # index_col = 0 if len(set(df.iloc[1:,0])) == df.shape[0]-1 and is_float_dtype(df.iloc[:,0])==False else None
            df = pd.read_csv(self.file, decimal=decimal, sep=separator, header=header, index_col=index_col)
            # st.write(decimal, separator, index_col, header)
            
            if df.select_dtypes(exclude='float').shape[1] >0:
                non_float = df.select_dtypes(exclude='float')
                
            else:
                non_float = pd.DataFrame()
    
    
            if df.select_dtypes(include='float').shape[1] >0:
                float_data = df.select_dtypes(include='float')
                
            else:
                float_data = pd.DataFrame()
            return float_data, non_float
                
    
    
    
    
    
    
    # ############## new function
    # def csv_loader(file):
    #     import clevercsv
    #     import numpy as np
    #     import pandas as pd
    
    #     dec_dia = ['.',',']
    #     sep_dia = [',',';']
    #     dec, sep = [], []
    #     with open(file, mode = 'r') as csvfile:
    #         lines = [csvfile.readline() for i in range(3)]
    #         for i in lines:
    #             for j in range(2):
    #                 dec.append(i.count(dec_dia[j]))
    #                 sep.append(i.count(sep_dia[j]))
                
    #     if dec[0] != dec[2]:
    #         header = 0
    #     else:
    #         header = 0
    
    
    #     semi = np.sum([sep[2*i+1] for i in range(3)])
    #     commas = np.sum([sep[2*i] for i in range(3)])
    
    #     if semi>commas:separator = ';'
    #     elif semi<commas: separator = ','
        
    #     elif semi ==0 and commas == 0: separator = ';'
        
    
    #     commasdec = np.sum([dec[2*i+1] for i in range(1,3)])
    #     dot = np.sum([dec[2*i] for i in range(1,3)])
    #     if commasdec>dot:decimal = ','
    #     elif commasdec<=dot:decimal = '.'
        
    #     if decimal == separator or len(np.unique(dec)) <= 2:
    #         decimal = "."
        
    #     df = pd.read_csv(file, decimal=decimal, sep=separator, header=None, index_col=None)
    #     try:
    #         rat = np.mean(df.iloc[0,50:60]/df.iloc[5,50:60])>10
    #         header = 0 if rat or np.nan else None
    #     except:
    #         header = 0
    
    #     from pandas.api.types import is_float_dtype
    
    #     if is_float_dtype(df.iloc[1:,0]):
    #         index_col = None
    #     else:
    #         try:
    #             te = df.iloc[1:,0].to_numpy().astype(float).dtype
                
    #         except:
    #             te = set(df.iloc[1:,0])
    
    #         if len(te) == df.shape[0]-1:
    #             index_col = 0
    #         elif len(te) < df.shape[0]-1:
    #             index_col = None
    #         else:
    #             index_col = None
    
    #     # index_col = 0 if len(set(df.iloc[1:,0])) == df.shape[0]-1 and is_float_dtype(df.iloc[:,0])==False else None
    #     df = pd.read_csv(file, decimal=decimal, sep=separator, header=header, index_col=index_col)
    #     # st.write(decimal, separator, index_col, header)
        
    #     if df.select_dtypes(exclude='float').shape[1] >0:
    #         non_float = df.select_dtypes(exclude='float')
            
    #     else:
    #         non_float = pd.DataFrame()
    
    
    #     if df.select_dtypes(include='float').shape[1] >0:
    #         float_data = df.select_dtypes(include='float')
            
    #     else:
    #         float_data = pd.DataFrame()
    #     return float_data, non_float