#! /usr/bin/python
#-*-coding: utf-8 -*-
import os
import sys
import json
import pwd
import grp
from AHF_Stimulator import AHF_Stimulator
from AHF_Camera import AHF_Camera
[docs]class AHF_Settings (object):
"""
AHF_Settings is a class that reads, edits, and saves settings for the AutoheadFix program
"""
[docs] def __init__ (self, fileName):
"""
Makes a new settings object, loading from a given file, letting user choose from existing files, or making new settings from user
Will try to load a file if file name is passed in,
"""
if fileName is not None:
self.load (fileName)
else:
# look for experiment config files in the current directory, they start with AHFexp_ and end with .jsn
iFile=0
files = ''
for f in os.listdir('.'):
if f.startswith ('AFHexp_') and f.endswith ('.jsn'):
files += '\n' + str (iFile) + ':' + f
iFile +=1
if iFile == 0: # no files found, create one
print ('Unable to find an experiment config file, let\'s make a new configuration:')
fileNum=-1
else:
inputPrompt = 'Enter file number to load experiment config file, or -1 to make new config\n'
inputPrompt += files +'\n:'
fileNum = int (input (inputPrompt))
if fileNum == -1:
self.config_from_user()
else:
# file list starts with a separator (\n) so we split the list on \n and get fileNum + 1
# each list item starts with fileNum: so split the list item on ":" and get item 1 to get file name
self.load ((files.split('\n')[fileNum + 1]).split (':')[1])
[docs] def config_from_user (self):
"""
Queries the user for settings. This function may run before a camera or stimulator has been configured
We need to run static function for stimulator configuration and camera configuration
"""
self.fileName = ''
# solenoid opening times, for entry rewards and task rewards, and other reward related settings
self.entranceRewardTime = float (input ('Solenoid opening duration, in seconds, for entrance rewards:'))
self.taskRewardTime= float (input ('Solenoid opening duration,in seconds, for task rewards:'))
self.maxEntryRewards = int (input ('Maximum number of entry rewards that will be given per day:'))
self.entryRewardDelay = float (input('Delay, in seconds, before an entrance reward is given:'))
# head fix related settings
self.propHeadFix = float (input('proportion (0 to 1) of trials that are head-fixed:'))
self.propHeadFix = min (1, max (0, self.propHeadFix)) # make sure proportion is bewteen 0 and 1
self.skeddadleTime = float (input ('Time, in seconds, for mouse to get head off the contacts when session ends:'))
tempInput = input ('Send text messages if mouse exceeds criterion time in chamber?(Y or N):')
self.hasTextMsg = bool(tempInput [0] == 'y' or tempInput [0] == 'Y')
if self.hasTextMsg == True:
self.inChamberTimeLimit = float (input ('In-Chamber duration limit, seconds, before stopping head-fix trials and sending text messg:'))
self.phoneList =tuple (input('Phone numbers to receive a text message if mouse is in chamber too long:').split(','))
else:
self.inChamberTimeLimit = float(input('In-Chamber duration limit, seconds, before stopping head-fix trials:'))
# Camera related settings, in a dictionary, static function, don't need a camera object to be created
self.camParamsDict = AHF_Camera.dict_from_user ({})
# UDP stuff - make a tuple of IP address of other computers
tempInput = input ('Send UDP triggers to start secondary cameras (Y or N):')
self.hasUDP = bool(tempInput [0] == 'y' or tempInput [0] == 'Y')
if self.hasUDP == True:
self.UDPList =tuple (input('IP addresses of Pis running secondary cameras:').split (','))
self.cameraStartDelay = float (input ('Delay in seconds between sending UDP and toggling blue LED.'))
# Stimulator class
self.stimulator = AHF_Stimulator.get_stimulator_from_user ()
# static function to make a configration without needing a stimulator to configure it
self.stimDict = AHF_Stimulator.get_class(self.stimulator).dict_from_user({})
[docs] def save (self):
"""
Saves current settings to a dictionary and then sends dictionary to a json dictionary file
"""
# get name for new config file and massage it a bit
if self.fileName != '':
newConfig = input ('Enter a name to save config, or enter to use current name, \'' + self.fileName + '\':')
if newConfig == '':
newConfig = self.fileName
else:
newConfig = input ('Enter a name to save config as file:')
if newConfig != self.fileName:
newConfig = 'AFHexp_' + ''.join([c for c in newConfig if c.isalpha() or c.isdigit() or c=='_']) + '.jsn'
self.fileName = newConfig
# make a dictionary from configuration
configDict = {}
configDict ['entranceRewardTime'] = self.entranceRewardTime
configDict ['taskRewardTime']= self.taskRewardTime
configDict['maxEntryRewards'] = self.maxEntryRewards
configDict['entryRewardDelay'] = self.entryRewardDelay
configDict['propHeadFix'] = self.propHeadFix
configDict['skeddadleTime'] = self.skeddadleTime
configDict['hasTextMsg'] = self.hasTextMsg
if self.hasTextMsg == True:
configDict['inChamberTimeLimit'] = self.inChamberTimeLimit
configDict['phoneList'] = self.phoneList
configDict['hasUDP'] = self.hasUDP
if self.hasUDP == True:
configDict['UDPList']=self.UDPList
configDict['cameraStartDelay'] = self.cameraStartDelay
configDict['camParams'] = self.camParamsDict
configDict['stimulator'] = self.stimulator
configDict['stimParams'] = self.stimDict
# open the file name
with open (newConfig, 'w') as fp:
fp.write (json.dumps (configDict))
fp.close ()
uid = pwd.getpwnam ('pi').pw_uid
gid = grp.getgrnam ('pi').gr_gid
os.chown (newConfig, uid, gid) # we run AutoHeadFix as root for GPIO, so expicitly set ownership for easy editing
[docs] def load (self, file):
with open (file, 'r') as fp:
configDict = json.loads(fp.read())
fp.close()
self.fileName = file
self.entranceRewardTime = float (configDict.get ('entranceRewardTime', 30e-03))
self.taskRewardTime = float (configDict.get ('taskRewardTime', 30e-03))
self.maxEntryRewards = int (configDict.get ('maxEntryRewards', 100))
self.entryRewardDelay = float (configDict.get('entryRewardDelay', 0.5))
self.propHeadFix = float (configDict.get('propHeadFix', 1.0))
self.skeddadleTime = float (configDict.get ('skeddadleTime', 0.25))
self.inChamberTimeLimit = float (configDict.get ('inChamberTimeLimit',600))
self.hasTextMsg = bool (configDict.get ('hasTextMsg', False))
if self.hasTextMsg == True:
self.phoneList = tuple (configDict.get('phoneList'))
self.hasUDP = bool(configDict.get ('hasUDP', False))
if self.hasUDP == True:
self.UDPList = tuple (configDict.get ('UDPList'))
self.cameraStartDelay = float (configDict.get('cameraStartDelay'))
self.camParamsDict = configDict.get('camParams', {})
self.stimulator = configDict.get('stimulator')
self.stimDict = configDict.get('stimParams')
[docs] def show (self):
"""
Prints the current configuration stored in this AHF_Settings to the console, nicely formatted
:param: none
:returns: nothing
"""
print ('****************Current Auto-Head-Fix experiment Settings********************************')
print ('1:Entrance Reward Time (secs) =' + str (self.entranceRewardTime))
print ('2:Task Reward Time (secs) =' + str (self.taskRewardTime))
print ('3:Maximum Daily Entry Rewards =' + str (self.maxEntryRewards))
print ('4:Entry Reward Delay (secs) =' + str (self.entryRewardDelay))
print ('5:Proportion of Contacts to Head Fix (0-1) =' + str(self.propHeadFix))
print ('6:Time for Mouse to break contact after trial (secs) =' + str(self.skeddadleTime))
print ('7:Use Text Messaging =' + str (self.hasTextMsg))
if self.hasTextMsg == True:
print ('\t7_a:List of Phone Numbers=' + str(self.phoneList))
print ('8:Duration in chamber (secs) before stopping trials and sending message =' + str (self.inChamberTimeLimit))
else:
print ('8:Duration in chamber (secs) before stopping trials =' + str (self.inChamberTimeLimit))
print ('9:Send UDP triggers = ' + str (self.hasUDP))
if self.hasUDP == True:
print ('\t9_a:List of ip addresses for UDP = ' + str(self.UDPList))
print ('\t9_b:Camera start to LED ON delay (secs) =' + str (self.cameraStartDelay))
print ('10:Stimulator = ' + self.stimulator)
i =0
for key in sorted (self.stimDict.keys()):
print ('\t10_' + chr (97 + i) + ": " + key + ' = ' + str (self.stimDict[key]))
i+=1
[docs] def edit_from_user (self):
"""
Allows user to edit experiment settings, including stimulator settings, but not camera settings
user can either change the stimulaotr, or reconfigure it, but not both
:returns: code for mods - bit 0 = 1 is set if stimulator config is changed, bit 1 =2 is set if stimulator is changed
"""
#
editVal=0
while True:
self.show()
editNum = input ('Enter number of paramater to Edit, or 0 when done:')
if editNum == '0':
tempInput = input ('Save configuration to file (Yes or No):')
if tempInput[0] == 'Y' or tempInput [0] == 'y':
self.save ()
break
elif editNum == '1':
self.entranceRewardTime = float (input ('Solenoid opening duration, in seconds, for entrance rewards:'))
elif editNum == '2':
self.taskRewardTime= float (input ('Solenoid opening duration,in seconds, for task rewards:'))
elif editNum == '3':
self.maxEntryRewards = int (input ('Maximum number of entry rewards that will be given per day:'))
elif editNum =='4':
self.entryRewardDelay = float (input('Delay, in seconds, before an entrance reward is given:'))
elif editNum =='5':
self.propHeadFix = float (input('proportion (0 to 1) of trials that are head-fixed:'))
self.propHeadFix = min (1, max (0, self.propHeadFix)) # make sure proportion is bewteen 0 and 1
elif editNum=='6':
self.skeddadleTime = float (input ('Time, in seconds, for mouse to get head off the contacts when session ends:'))
elif editNum =='7':
tempInput = input ('Send text messages if mouse exceeds criterion time in chamber?(Y or N):')
self.hasTextMsg = bool(tempInput [0] == 'y' or tempInput [0] == 'Y')
if self.hasTextMsg == True and self.phoneList == '':
self.phoneList =tuple (input('Phone numbers to receive a text message if mouse is in chamber too long:').split(','))
elif editNum == '7a':
self.phoneList =tuple (input('Phone numbers to receive a text message if mouse is in chamber too long:').split(','))
elif editNum == '8':
self.inChamberTimeLimit = float (input ('Time limit in seconds for mouse being in the chamber before stopping headfixes, optionally sending text mssg:'))
elif editNum == '9':
tempInput = input ('Send UDP triggers to start secondary cameras (Y or N):')
self.hasUDP = bool(tempInput [0] == 'y' or tempInput [0] == 'Y')
if self.UDPList == True:
self.UDPList =tuple (input('IP addresses of Pis running secondary cameras:').split (','))
self.cameraStartDelay = float (input ('Delay in seconds between sending UDP and toggling blue LED.'))
elif editNum == '9a':
self.UDPList =tuple (input('IP addresses of Pis running secondary cameras:').split (','))
elif editNum == '9b':
self.cameraStartDelay = float (input ('Delay in seconds between sending UDP and toggling blue LED.'))
elif editNum == '10':
self.stimulator = AHF_Stimulator.get_stimulator_from_user ()
editVal = editVal | 2
elif editNum.split('_')[0] == '10':
editVal = editVal | 1
selectedKey = ord (editNum.split ('_')[1]) -97
i=0
for key in sorted (self.stimDict.keys()):
if i==selectedKey:
newValue = input ('Set ' + key + ' (currently ' + str (self.stimDict.get(key)) + ') to:')
self.stimDict.update ({key : newValue})
break
i+=1
return editVal
## for testing purposes
if __name__ == '__main__':
settings = AHF_Settings (None)
settings.edit_from_user()
settings.show()