neuroQWERTY MIT-CSXPD Dataset 1.0.0

File: <base>/nqDataLoader.py (10,023 bytes)
# -*- coding: utf-8 -*-

# set modules  dir
import numpy as np
import sys, os, re, datetime


class NqDataLoader:
    FLT_NO_MOUSE = 1 << 0
    FLT_NO_LETTERS = 1 << 1
    FLT_NO_BACK = 1 << 2
    FLT_NO_SHORT_META = 1 << 3    # space, enter, arrows, etc.
    FLT_NO_LONG_META = 1 << 4 # shift, control, alt, ect.
    FLT_NO_PUNCT = 1 << 5
    
    def __init__(self):
        self.dataKeys = None
        self.dataHT = None
        self.dataTimeStart = None
        self.dataTimeEnd = None
        pass
    

    def sanityCheck( self ):
        """
        Filter out keystrokes variables in the member variables. 
        Eliminate anything < 0.
        returns the number of elements removed
        """
        assert( self.dataKeys is not None and len(self.dataKeys) > 0 )
        assert( self.dataHT is not None and len(self.dataHT) > 0 )
        assert( self.dataTimeStart is not None and len(self.dataTimeStart) > 0 )
        assert( self.dataTimeEnd is not None and len(self.dataTimeEnd) > 0 )
        
        badLbl = self.dataTimeStart <= 0
        badLbl = np.bitwise_or( badLbl,  self.dataTimeEnd <= 0)
        badLbl = np.bitwise_or( badLbl,  self.dataHT < 0)
        badLbl = np.bitwise_or( badLbl,  self.dataHT >= 5)
        #----- remove non consecutive start times
        nonConsTmpLbl = np.zeros( len(self.dataTimeStart) ) == 0 # start with all True labels
        nonConsLbl = np.zeros( len(self.dataTimeStart) ) > 0 # start with all False labels
        startTmpArr = self.dataTimeStart.copy()
        while ( np.sum( nonConsTmpLbl ) > 0 ):
			# find non consecutive labels
			nonConsTmpLbl = np.append([False], np.diff(startTmpArr)<0)                
   			# keep track of the indeces to remove
			nonConsLbl = np.bitwise_or( nonConsLbl,  nonConsTmpLbl)
               # changes value in the temporary array
			indecesToChange = np.arange(len(nonConsTmpLbl))[nonConsTmpLbl]
			startTmpArr[indecesToChange] = startTmpArr[indecesToChange-1]

        badLbl = np.bitwise_or( badLbl,  nonConsLbl)
        #-----
		
        # invert bad labels
        goodLbl = np.bitwise_not(badLbl)
        
        self.dataKeys = self.dataKeys[goodLbl]
        self.dataHT = self.dataHT[goodLbl]
        self.dataTimeStart = self.dataTimeStart[goodLbl]
        self.dataTimeEnd = self.dataTimeEnd[goodLbl]
             
        
        return sum(badLbl)
    
    def loadDataFile(self, fileIn, autoFilt=True, impType=None, debug=False):  
        """
        Load raw data file
        """      
        errorStr = ''
        try:
            data = []
            
#            if data.dtype == np.int64: # Sleep inertia format
            if impType =='si':
                data = np.genfromtxt(fileIn, dtype=long, delimiter=',', skip_header=0)
                data = data - data.min()
                data = data.astype(np.float64) / 1000
                self.dataTimeStart = data[:,0]  
                self.dataTimeEnd = data[:,1]
                self.dataHT = self.dataTimeEnd - self.dataTimeStart
                #TO REMOVE
                self.dataKeys = np.zeros(len(self.dataHT))#Just to make sanity work
                remNum = self.sanityCheck()
                #print remNum
            else: # PD format
                data = np.genfromtxt(fileIn, dtype=None, delimiter=',', skip_header=0)
                # load
                self.dataKeys = data['f0']
                self.dataHT = data['f1']  
                self.dataTimeStart = data['f3']  #No CHANGED 2<->3
                self.dataTimeEnd = data['f2']
                remNum = self.sanityCheck()
                #print '{:}, {:} %'.format( remNum, 1.0*remNum/len(self.dataHT) )
                
                if (debug):
                    print 'removed ', str(remNum), ' elements'

                if( autoFilt ):
                    self.filtData(self.FLT_NO_MOUSE  | self.FLT_NO_LONG_META )
            
            # load flight time
            self.dataFT = np.array([ self.dataTimeStart[i]-self.dataTimeStart[i-1]  for i in range(1,self.dataTimeStart.size) ])
            self.dataFT = np.append(self.dataFT, 0)
            
            
            
            return True
        except IOError:
            errorStr = 'file {:s} not found'.format(fileIn)
            return errorStr
    def loadDataArr(self, lstArr):
        self.dataKeys = np.zeros((len(lstArr),1), dtype='S30')
        self.dataHT = np.zeros((len(lstArr),1))
        self.dataTimeStart = np.zeros((len(lstArr),1))  
        self.dataTimeEnd =np.zeros((len(lstArr),1))
        i = 0
        for row in lstArr:
            tok = row.split(',')
            self.dataKeys[i] = str(tok[0])
            self.dataHT[i] = str(tok[1])
            self.dataTimeStart[i] = str(tok[2])
            self.dataTimeEnd[i] = str(tok[3]) 
            i += 1
            
        #self.loadDataFile(lstArr.toString())
    

    def filtData(self, flags):
        """
        Filter data
        return (fltKeys, fltHT, fltTimeStart, fltTimeEnd)
        """
        #-- filters
        pMouse=re.compile('("mouse.+")')
        pChar=re.compile('(".{1}")')
        pBack=re.compile('("BackSpace")')
        pLongMeta=re.compile('("Shift.+")|("Alt.+")|("Control.+")')
        pShortMeta=re.compile('("space")|("Num_Lock")|("Return")|("P_Enter")|("Caps_Lock")|("Left")|("Right")|("Up")|("Down")')
        pPunct=re.compile('("more")|("less")|("exclamdown")|("comma")|("\[65027\]")|("\[65105\]")|("ntilde")|("minus")|("equal")|("bracketleft")|("bracketright")|("semicolon")|("backslash")|("apostrophe")|("comma")|("period")|("slash")|("grave")')
        #--

        #-- create mask labels        
        lbl = np.ones(len( self.dataKeys ))==1
        if( flags & self.FLT_NO_MOUSE ):
            lblTmp = [ pMouse.match( k ) is None for k in self.dataKeys]
            lbl = lbl & lblTmp
        if( flags & self.FLT_NO_LETTERS ):
            lblTmp = [ pChar.match( k ) is None for k in self.dataKeys]
            lbl = lbl & lblTmp
        if( flags & self.FLT_NO_BACK ):
            lblTmp = [ pBack.match( k ) is None for k in self.dataKeys]
            lbl = lbl & lblTmp
        if( flags & self.FLT_NO_SHORT_META ):
            lblTmp = [ pShortMeta.match( k ) is None for k in self.dataKeys]
            lbl = lbl & lblTmp
        if( flags & self.FLT_NO_LONG_META ):
            lblTmp = [ pLongMeta.match( k ) is None for k in self.dataKeys]
            lbl = lbl & lblTmp
        if( flags & self.FLT_NO_PUNCT ):
            lblTmp = [ pPunct.match( k ) is None for k in self.dataKeys]
            lbl = lbl & lblTmp
        #--
        
        self.lbl = lbl        
        
        self.dataKeys = self.dataKeys[lbl]
        self.dataHT = self.dataHT[lbl]
        self.dataTimeStart = self.dataTimeStart[lbl]
        self.dataTimeEnd = self.dataTimeEnd[lbl]        
        
    def getStdVariablesFilt( fileIn, impType=None ):
        """
        Receives as parameter the location of the raw typing file
        Return filtered variables (i.e. no mouse clicks, no long meta buttons, no backspaces) 
        format returned (array of keys, array of hold times, array of press events timestamps, array of release events timestamps )
        """
        nqObj = self
        res = nqObj.loadDataFile( fileIn, False, impType)
        # remove delete button
        nqObj.filtData(nqObj.FLT_NO_MOUSE  | nqObj.FLT_NO_LONG_META | nqObj.FLT_NO_BACK )
        assert(res==True) # make sure the file exists
        dataKeys = nqObj.dataKeys
        dataHT = nqObj.dataHT
        dataTimeStart = nqObj.dataTimeStart
        dataTimeEnd = nqObj.dataTimeEnd
        
        return dataKeys, dataHT, dataTimeStart, dataTimeEnd


def getDataFiltHelper( fileIn, impType=None ):
    """
    Helper method to load filtered keypress data from given file
    :param fileIn: path to csv keypress file 
    :param impType: format of the csv file ('si': for sleep inertia data, None for PD data)
    :return: list of array with dataKeys, dataHT, dataTimeStart, dataTimeEnd
    """
    nqObj = NqDataLoader()
    res = nqObj.loadDataFile( fileIn, False, impType)
    # remove delete button
    nqObj.filtData(nqObj.FLT_NO_MOUSE  | nqObj.FLT_NO_LONG_META | nqObj.FLT_NO_BACK )
    assert(res==True) # make sure the file exists
    dataKeys = nqObj.dataKeys
    dataHT = nqObj.dataHT
    dataTimeStart = nqObj.dataTimeStart
    dataTimeEnd = nqObj.dataTimeEnd
    
    return dataKeys, dataHT, dataTimeStart, dataTimeEnd
    
    
def genFileStruct( dataDir, maxRepNum=4 ):
    '''
    Generate a dictionary with the NQ file list and test date (legacy method)
    :param dataDir: base directory containing the CSV files
    :param maxRepNum: integer with the maximum repetition number
    :return: two dictionaries: fMap, dateMap = NQ file/date list[pID][repID][expID]
    '''
    fMap = {} # data container
    dateMap = {}
    files = os.listdir( dataDir )    
    p = re.compile( '([0-9]+)\.{1}([0-9]+)_([0-9]+)_([0-9]+)\.csv' )
    for f in files:
        m = p.match( f )
        
        if( m ): # file found
            timeStamp = m.group(1)
            pID = int(m.group(2))
            repID = int(m.group(3))
            expID = int(m.group(4))
            # store new patient
            if( not fMap.has_key(pID) ):
                fMap[pID] = {}
                dateMap[pID] = {}
                for tmpRid in range(1, maxRepNum+1):
                    fMap[pID][tmpRid] = {}
                    dateMap[pID][tmpRid] = {}
                # fMap[pID] = {1: {}, 2: {}, 3: {}, 4:{}}
            # store data
            fMap[pID][repID][expID] = dataDir + f
            dateMap[pID][repID][expID] = datetime.datetime.fromtimestamp(int(timeStamp))
        else:
            print f, ' no'
            
    return fMap, dateMap