Source code for core.source.Source

# DESCRIPTION
#       Agilepy software
#
# NOTICE
#      Any information contained in this software
#      is property of the AGILE TEAM and is strictly
#      private and confidential.
#      Copyright (C) 2005-2020 AGILE Team.
#          Baroncelli Leonardo <leonardo.baroncelli@inaf.it>
#          Addis Antonio <antonio.addis@inaf.it>
#          Bulgarelli Andrea <andrea.bulgarelli@inaf.it>
#          Parmiggiani Nicolò <nicolo.parmiggiani@inaf.it>
#      All rights reserved.

#This program is free software: you can redistribute it and/or modify
#it under the terms of the GNU General Public License as published by
#the Free Software Foundation, either version 3 of the License, or
#(at your option) any later version.

#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#GNU General Public License for more details.

#You should have received a copy of the GNU General Public License
#along with this program.  If not, see <https://www.gnu.org/licenses/>.
from colorama import Fore, Back, Style

from agilepy.utils.AstroUtils import AstroUtils
from agilepy.core.source.Spectrum import Spectrum
from agilepy.core.source.SpatialModel import SpatialModel
from agilepy.core.source.MultiAnalysis import MultiAnalysis

from agilepy.core.CustomExceptions import SelectionParamNotSupported, \
                                          SourceParameterNotFound, \
                                          NotFreeableParamsError, \
                                          SourceTypeNotSupportedError, \
                                          XMLParseError, \
                                          SourcesAgileFormatParsingError, \
                                          SourceParamNotFoundError, \
                                          MultiOutputNotFoundError

[docs]class Source: def __init__(self, name): self.name = name self.spectrum = None self.spatialModel = None self.multiAnalysis = MultiAnalysis()
[docs] def getName(self): """It gets the name for the source Args: None Returns: name(str): Source name """ return self.name
[docs] def getFreeableParams(self): """It gets parameters to free Args: None Returns: List(str): parameters to free """ return self.spectrum.getFreeableParams() + self.spatialModel.getFreeableParams()
def setDistanceFromMapCenter(self, mapCenterL, mapCenterB): if self.multiAnalysis.multiDate["value"] is not None: sourceL = self.multiAnalysis.multiL["value"] sourceB = self.multiAnalysis.multiB["value"] if sourceL == -1: sourceL = self.multiAnalysis.multiStartL["value"] if sourceB == -1: sourceB = self.multiAnalysis.multiStartB["value"] else: sourceL = self.spatialModel.pos["value"][0] sourceB = self.spatialModel.pos["value"][1] self.spatialModel.dist["value"] = AstroUtils.distance(sourceL, sourceB, mapCenterL, mapCenterB) def getSelectionValue(self, paramName): paramName = paramName.lower() if paramName == "name": return self.name elif paramName == "flux": if self.multiAnalysis.getVal("multiDate") is not None: return self.get("multiFlux")["value"] else: return self.get("flux")["value"] elif paramName == "dist": if self.multiAnalysis.multiDate["value"] is not None: return self.get("multiDist")["value"] else: return self.get("dist")["value"] elif paramName == "multisqrtts" or paramName == "sqrtts": if self.multiAnalysis.getVal("multiDate") is not None: return self.get("multiSqrtTS")["value"] else: return None else: raise SelectionParamNotSupported(f"The selection parameter '{paramName}' is not supported!") def updateMultiAnalysis(self, multiAnalysisResult, mapCenterL, mapCenterB): # Swap last mle analysis results (if any) to spectrum and spatialModel if self.multiAnalysis.multiDate["value"] is not None: spectrumParameters = [param["name"] for param in self.spectrum.getAttributes()] for spectrumParamName in spectrumParameters: # get the corresponding spectrum param name in MultiAnalysis multiParamName = MultiAnalysis.spectrumMultiMapping[spectrumParamName] # get the actual value multiParamValue = self.get(multiParamName)["value"] multiParamValueErr = self.get(multiParamName+"Err")["value"] if multiParamValue is not None: self.set(spectrumParamName, {"value": multiParamValue, "err": multiParamValueErr}) # New position self.spatialModel.pos["value"] = (self.multiAnalysis.multiLPeak["value"], self.multiAnalysis.multiBPeak["value"]) # New distance from map center self.setDistanceFromMapCenter(mapCenterL, mapCenterB) # Override the MLE analysis results self.multiAnalysis = multiAnalysisResult """ freeParams = self.getFreeParams() self.logger.info( f"{multiAnalysisResult.get('multiName')} (free) parameters update after mle: {freeParams}") if source.multi.get("multiL") != -1 and source.multi.get("multiB") != -1: oldPos = source.spatialModel.get("pos") newPos = Parameter("pos", "tuple<float,float>") newPos.setAttributes(value = f"({source.multi.multiL.value}, {source.multi.multiB.value})", free = source.spatialModel.pos.free) source.spatialModel.pos = newPos if oldPos != newPos.get(): oldDistance = source.spatialModel.get("dist") mapCenterL = float(self.config.getOptionValue("glon")) mapCenterB = float(self.config.getOptionValue("glat")) newDistance = source.getSourceDistanceFromLB(mapCenterL, mapCenterB) source.spatialModel.dist.setAttributes(value = newDistance) source.multi.set("multiDist", newDistance) self.logger.info( f"'pos' parameter has been updated {oldPos}==>{source.spatialModel.get('pos')}") self.logger.info( f"'dist' has been updated {oldDistance}==>{source.spatialModel.get('dist')}") else: self.logger.info( f"'pos' parameter has not changed: {source.spatialModel.get('pos')}") else: self.logger.info( f"multiL,multiB=({source.multi.get('multiL')},{source.multi.get('multiB')}). 'pos' parameter has not changed: {source.spatialModel.get('pos')}") if "pos" in freeParams: freeParams.remove("pos") self.updateSpectrumParameters(source, freeParams) def updateSpectrumParam(self,source, multiParamName, spectrumParamName): if hasattr(source.spectrum, spectrumParamName): newVal = source.multi.get(multiParamName) oldVal = source.spectrum.get(spectrumParamName) # print(f"{multiParamName} {spectrumParamName} newVal: '{newVal}' parameter") # print(f"{multiParamName} {spectrumParamName} oldVal: '{oldVal}' parameter") if oldVal is None or newVal != oldVal: source.spectrum.set(spectrumParamName, newVal) self.logger.info( f"'{spectrumParamName}' parameter has been updated: {oldVal}==>{newVal}") # print(f"'{spectrumParamName}' parameter has been updated: {oldVal}==>{newVal}") else: self.logger.info( f"'{spectrumParamName}' parameter has not changed: {oldVal}==>{newVal}") # print(f"'{spectrumParamName}' parameter has not changed: {oldVal}==>{newVal}") def updateSpectrumParameters(self, source, freeParams): for paramName in freeParams: self.updateSpectrumParam(source, mapping[paramName], paramName) for paramName in ["fluxErr", "indexErr", "index1Err", "cutoffEnergyErr", "pivotEnergyErr", "index2Err", "curvatureErr"]: self.updateSpectrumParam(source, mapping[paramName], paramName) """ @staticmethod def getSource(type, name): if type == "PointSource": return PointSource(name) else: raise SourceTypeNotSupportedError(f"The source type '{type}' is not supported.") @staticmethod def parseSourceXMLFormat(sourceRoot): if sourceRoot.tag != "source": raise XMLParseError(f"Tag <source> expected, '{sourceRoot.tag}' found.") newSource = Source.getSource(**sourceRoot.attrib) for sourceDescription in sourceRoot: if sourceDescription.tag not in ["spectrum", "spatialModel"]: raise XMLParseError(f"Tag <spectrum> or <spatialModel> expected, '{sourceDescription.tag}' found.") if sourceDescription.tag == "spectrum": newSource.spectrum = Spectrum.getSpectrum(sourceDescription.attrib["type"]) if sourceDescription.tag == "spatialModel": newSource.spatialModel = SpatialModel.getSpatialModel(sourceDescription.attrib["type"]) newSource.set("locationLimit", {'name': 'locationLimit', 'value': sourceDescription.attrib["location_limit"]}) for parameter in sourceDescription: if parameter.tag != "parameter": raise XMLParseError(f"Tag <parameter> expected, '{parameter.tag}' found.") newSource.set(parameter.attrib["name"], parameter.attrib) return newSource @staticmethod def parseSourceTXTFormat(txtLine): elements = [elem.strip() for elem in txtLine.split(" ") if elem] # each line is a source if len(elements) != 17: raise SourcesAgileFormatParsingError("The number of elements on the line {} is not 17, but {}".format(txtLine, len(elements))) flux = float(elements[0]) glon = elements[1] glat = elements[2] index = float(elements[3]) fixflag = int(elements[4]) name = elements[6] locationLimit = int(float(elements[7])) spectrumType = int(elements[8]) if fixflag == 0: free_bits = [0 for i in range(6)] free_bits_position = 0 elif fixflag == 32: free_bits = [0,0,0,0,0,2] free_bits_position = free_bits[5] else: fixflagBinary = f'{fixflag:06b}' free_bits = [int(bit) for bit in reversed(fixflagBinary)] free_bits_position = free_bits[1] newSource = Source.getSource(name=name, type="PointSource") if spectrumType == 0: newSource.spectrum = Spectrum.getSpectrum("PowerLaw") elif spectrumType == 1: newSource.spectrum = Spectrum.getSpectrum("PLExpCutoff") elif spectrumType == 2: newSource.spectrum = Spectrum.getSpectrum("PLSuperExpCutoff") elif spectrumType == 3: newSource.spectrum = Spectrum.getSpectrum("LogParabola") else: raise SourcesAgileFormatParsingError("spectrumType={} not supported. Supported: [0,1,2,3]".format(spectrumType)) newSource.set("flux", {"name": "flux", "free": free_bits[0], "value": flux}) if spectrumType == 0: newSource.set("index", {"name":"index", "free": free_bits[2], "scale": -1.0, "value": index, "min": float(elements[11]), "max": float(elements[12])}) elif spectrumType == 1: newSource.set("index", {"name":"index", "free":free_bits[2], "scale":-1.0, "value":index, "min":float(elements[11]), "max":float(elements[12])}) newSource.set("cutoffEnergy", {"name":"cutoffEnergy", "free":free_bits[3], "scale":-1.0, "value":float(elements[9]), "min":float(elements[13]), "max":float(elements[14])}) elif spectrumType == 2: newSource.set("index1", {"name":"index1", "free":free_bits[2], "scale":-1.0, "value":index, "min":float(elements[11]), "max":float(elements[12])}) newSource.set("cutoffEnergy", {"name":"cutoffEnergy", "free":free_bits[3], "scale":-1.0, "value":float(elements[9]), "min":float(elements[13]), "max":float(elements[14])}) newSource.set("index2", {"name":"index2", "free":free_bits[4], "value":float(elements[10]), "min":float(elements[15]), "max":float(elements[16])}) elif spectrumType == 3: newSource.set("index", {"name":"index", "free":free_bits[2], "scale":-1.0, "value":index, "min":float(elements[11]), "max":float(elements[12])}) newSource.set("pivotEnergy", {"name":"pivotEnergy", "free":free_bits[3], "scale":-1.0, "value":float(elements[9]), "min":float(elements[13]), "max":float(elements[14])}) newSource.set("curvature", {"name":"curvature", "free":free_bits[4], "value":float(elements[10]), "min":float(elements[15]), "max":float(elements[16])}) newSource.spatialModel = SpatialModel.getSpatialModel("PointSource") newSource.set("pos", {"name":"pos", "value":f"({glon},{glat})", "free":free_bits_position}) newSource.set("locationLimit", {"name":"locationLimit", "value":locationLimit}) return newSource @staticmethod def fromDictionary(sourceName, dictData): # check type and spatial model parameters requiredKeys = ["glon", "glat", "spectrumType"] for rK in requiredKeys: if rK not in dictData: raise SourceParamNotFoundError(f"'{rK}' is a required param. Please add it to the 'sourceObject' input dictionary") newSource = Source.getSource(name=sourceName, type="PointSource") newSource.spatialModel = SpatialModel.getSpatialModel("PointSource") newSource.set("pos", {"value": (dictData["glon"], dictData["glat"])}) newSource.set("locationLimit", {"value": 0}) newSource.spectrum = Spectrum.getSpectrum(dictData["spectrumType"]) spectrumKeys = [param["name"] for param in newSource.spectrum.getAttributes()] # check spectrum parameters for sK in spectrumKeys: if sK not in dictData: raise SourceParamNotFoundError(f"'{sK}' is a required param. Please add it to the 'sourceObject' input dictionary") for sK in spectrumKeys: newSource.set(sK, {"value": dictData[sK]}) return newSource
[docs]class PointSource(Source): def __init__(self, name): super().__init__(name) self.spatialModel = SpatialModel.getSpatialModel("PointSource")
[docs] def get(self, parameterName): """It returns a source's parameter. Args: paramName (str): the name of the source's parameter. Raises: SourceParameterNotFound: if the source's parameter is not found. Returns: A dictionary containing the source's attributes. Example: >>> s.get("index") >>> s.get("pos") >>> s.get("multiFlux") """ if self.spectrum is not None and parameterName in vars(self.spectrum): return getattr(self.spectrum, parameterName) if self.spatialModel is not None and parameterName in vars(self.spatialModel): return getattr(self.spatialModel, parameterName) if self.multiAnalysis is not None and parameterName in vars(self.multiAnalysis): return getattr(self.multiAnalysis, parameterName) raise SourceParameterNotFound(f"Cannot perform get(), {parameterName} is not found.")
[docs] def getVal(self, parameterName): """It returns a source's parameter value. Args: paramName (str): the name of the source's parameter. Raises: SourceParameterNotFound: if the source's parameter is not found. Returns: The value of the source's parameter. Example: >>> s.getVal("index") >>> s.getVal("pos") >>> s.getVal("multiFlux") """ return self.get(parameterName)["value"]
[docs] def set(self, parameterName, attributeValueDict): """It sets a source's parameter. Args: parameterName (str): the name of the source's parameter. Returns: None Example: >>> s.set("index",{"value":1, "min":10}) """ if type(attributeValueDict) != dict: raise ValueError("The input parameter 'attributeValueDict' must be a dictionary!") if self.spectrum is not None and parameterName in vars(self.spectrum): return self.spectrum.setParameter(parameterName, attributeValueDict) if self.spatialModel is not None and parameterName in vars(self.spatialModel): return self.spatialModel.setParameter(parameterName, attributeValueDict) if self.multiAnalysis is not None and parameterName in vars(self.multiAnalysis): return self.multiAnalysis.setParameter(parameterName, attributeValueDict) raise SourceParameterNotFound(f"Cannot perform set(), {parameterName} is not found.")
[docs] def setVal(self, parameterName, parameterValue): """It sets a source's parameter. Args: parameterName (str): the name of the source's parameter. Returns: None Example: >>> s.set("index",{"value":1, "min":10}) """ self.set(parameterName, {"value": parameterValue})
[docs] def getFreeParams(self): """It returns the source's attributes that are free to vary. Returns: The list of attributes that are free to vary. Example: >>> s.getFreeParams() >>> ["flux", "index"] """ return self.spectrum.getFreeParams() + self.spatialModel.getFreeParams()
def setSpectrum(self, spectrumType): self.spectrum = Spectrum.getSpectrum(spectrumType) def setSpatialModel(self, spatialModelType): self.spatialModel = SpatialModel.getSpatialModel(spatialModelType) def setMultiAnalysis(self): self.multiAnalysis = MultiAnalysis() def setFreeAttributeValueOf(self, parameterName, freeval): parameter = self.get(parameterName) if "free" not in parameter: raise NotFreeableParamsError(f"The parameter '{parameterName}' cannot be free") willChange = False if parameter["free"] != freeval: willChange = True self.set(parameterName, {"free": freeval}) return willChange def bold(self, ss): return '\033[1m' + ss + '\033[0m' def colorRed(self, ss): return Fore.RED + ss + Fore.RESET def colorBlue(self, ss): return Fore.BLUE + ss + Fore.RESET def important(self, ss): return Style.BRIGHT + Fore.RED + ss + Style.RESET_ALL def __str__title(self): strr = '\n-----------------------------------------------------------' strr += self.bold(f'\n Source name: {self.name} ({type(self).__name__})') multiSqrtTs = self.multiAnalysis.getVal("multiSqrtTS") if multiSqrtTs is not None: if multiSqrtTs > 4: strr += self.important(" => sqrt(ts): "+str(multiSqrtTs)) else: strr += self.bold(" => sqrt(ts): "+str(multiSqrtTs)) return strr def __str__spectrumType(self): return f"\n * {self.bold('Spectrum type')}: {type(self.spectrum).__name__}" def __str__freeParams(self): freeParams = self.getFreeParams() if len(freeParams) == 0: return f'\n * {self.bold("Free parameters")}: none' else: return f'\n * {self.bold("Free parameters")}: '+' '.join(freeParams) def __str__sourceParams(self): strr = '' strr += f'\n * {self.bold("Initial source parameters")}:' spectrumParams = self.spectrum.getAttributes() for param in spectrumParams: if param["name"] == "flux": strr += f"\n\t- {param['name']} ({param['um']}): {param['value']:.4e}" if param['err'] is not None and param['err'] != 0: strr += f" +/- {param['err']:.4e}" else: strr += f"\n\t- {param['name']} {param['um']}: {param['value']}" if param['err'] is not None and param['err'] != 0: strr += f" +/- {param['err']}" spatialModelParams = self.spatialModel.getAttributes() for param in spatialModelParams: if param["name"] == "pos": strr += f"\n\t- Source position {param['um']}: {param['value']}" if param["name"] == "dist" and param['value'] is not None: strr += f"\n\t- Distance from map center ({param['um']}): {round(param['value'], 4)}" return strr def __str__multiAnalisys(self): strr = '' if self.multiAnalysis.multiDate["value"] is None: return strr strr = f'\n * {self.bold("Last MLE analysis")} ({self.multiAnalysis.multiDate["value"]}):' # spectrum parameters # get the corresponding multi spectrum parameters spectrumParamsNames = [a["name"] for a in self.spectrum.getAttributes()] for spectrumParamName, multiSpectrumParamName in MultiAnalysis.spectrumMultiMapping.items(): if spectrumParamName in spectrumParamsNames: param = self.multiAnalysis.get(multiSpectrumParamName) paramErr = self.multiAnalysis.get(multiSpectrumParamName+"Err") if spectrumParamName == "flux": strr += f"\n\t- {spectrumParamName} ({param['um']}): {param['value']:.4e}" if paramErr['value'] is not None and paramErr['value'] != 0: strr += f" +/- {paramErr['value']:.4e}" else: strr += f"\n\t- {spectrumParamName} {param['um']}: {param['value']}" if paramErr['value'] is not None and paramErr['value'] != 0: strr += f" +/- {paramErr['value']}" # ag_multi parameters strr += f'\n\t- counts: {self.multiAnalysis.getVal("multiCounts")}' if self.multiAnalysis.getVal("multiCountsErr") is not None and self.multiAnalysis.getVal("multiCountsErr") != 0: strr += f' +/- {self.multiAnalysis.get("multiCountsErr")["value"]}' strr += f'\n\t- upper limit({self.multiAnalysis.get("multiUL")["um"]}): {self.multiAnalysis.getVal("multiUL")}' strr += f'\n\t- ergLog(erg/cm2s): {self.multiAnalysis.getVal("multiErgLog")}' if self.multiAnalysis.getVal("multiErgLogErr") is not None and self.multiAnalysis.getVal("multiErgLogErr") !=0: strr += f' +/- {self.multiAnalysis.getVal("multiErgLogErr")}' strr += f'\n\t- galCoeff: {self.multiAnalysis.getVal("multiGalCoeff")}' if self.multiAnalysis.getVal("multiGalErr") is not None and sum(self.multiAnalysis.getVal("multiGalErr")) != 0: strr += f' +/- {self.multiAnalysis.getVal("multiGalErr")}' strr += f'\n\t- isoCoeff: {self.multiAnalysis.getVal("multiIsoCoeff")}' if self.multiAnalysis.getVal("multiIsoErr") is not None and sum(self.multiAnalysis.getVal("multiIsoErr")) != 0: strr += f' +/- {self.multiAnalysis.getVal("multiIsoErr")}' strr += f'\n\t- exposure({self.multiAnalysis.get("multiExp")["um"]}): {self.multiAnalysis.getVal("multiExp")}' strr += f'\n\t- exp-ratio: {self.multiAnalysis.getVal("multiExpRatio")}' strr += f'\n\t- L_peak: {self.multiAnalysis.getVal("multiLPeak")}' strr += f'\n\t- B_peak: {self.multiAnalysis.getVal("multiBPeak")}' strr += f'\n\t- Distance from start pos: {self.multiAnalysis.getVal("multiDistFromStartPositionPeak")}' strr += f'\n\t- ellipse position:' strr += f'\n\t - L: {self.multiAnalysis.getVal("multiL")}' strr += f'\n\t - B: {self.multiAnalysis.getVal("multiB")}' strr += f'\n\t - Distance from start pos: {self.multiAnalysis.getVal("multiDistFromStartPosition")}' strr += f'\n\t - radius of circle: {self.multiAnalysis.getVal("multir")}' strr += f'\n\t - ellipse:' strr += f'\n\t\t - a: {self.multiAnalysis.getVal("multia")}' strr += f'\n\t\t - b: {self.multiAnalysis.getVal("multib")}' strr += f'\n\t\t - phi: {self.multiAnalysis.getVal("multiphi")}' strr += '\n-----------------------------------------------------------' return strr def __str__(self): strr = '' strr += self.__str__title() strr += self.__str__spectrumType() strr += self.__str__freeParams() strr += self.__str__sourceParams() strr += self.__str__multiAnalisys() return strr