Source code for api.AGVisibility

# DESCRIPTION
#       Agilepy software
#
# NOTICE
#      Any information contained in this software
#      is property of the AGILE TEAM and is strictly
#      private and confidential.
#      Copyright (C) 2005-2020 AGILE Team.
#          Baroncelli Leonardo <leonardo.baroncelli@inaf.it>
#          Addis Antonio <antonio.addis@inaf.it>
#          Bulgarelli Andrea <andrea.bulgarelli@inaf.it>
#          Parmiggiani Nicolò <nicolo.parmiggiani@inaf.it>
#      All rights reserved.

#This program is free software: you can redistribute it and/or modify
#it under the terms of the GNU General Public License as published by
#the Free Software Foundation, either version 3 of the License, or
#(at your option) any later version.

#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#GNU General Public License for more details.

#You should have received a copy of the GNU General Public License
#along with this program.  If not, see <https://www.gnu.org/licenses/>.

import numpy as np
import os
from astropy.coordinates import SkyCoord
from astropy import units as u
from astropy.io import fits
from astropy.table import Table
from astropy.time import Time
from os.path import join, expandvars
from pathlib import Path

from agilepy.core.AGBaseAnalysis import AGBaseAnalysis
from agilepy.utils.AstroUtils import AstroUtils
from agilepy.utils.Utils import Utils

[docs]class AGVisibility(AGBaseAnalysis): """This class contains the high-level API methods you can use to run the visibility analysis. This class requires you to setup a ``yaml configuration file`` to specify the software's behaviour. Class attributes: Attributes: config (:obj:`AgilepyConfig`): it is used to read/update configuration values. logger (:obj:`AgilepyLogger`): it is used to log messages with different importance levels. """
[docs] def __init__(self, configurationFilePath): """AGVisibility constructor. Args: configurationFilePath (str): the relative or absolute path to the yaml configuration file. Example: >>> from agilepy.api import AGVisibility >>> ageng = AGVisibility('agconfig.yaml') """ # Load the AGBaseAnalysis object and attributes super().__init__(configurationFilePath) self.config.loadConfigurationsForClass("AGVisibility") self.logger = self.agilepyLogger.getLogger(__name__, "AGVisibility") # Set attributes self._agileTable = None self._fermiTable = None # End initialization self.logger.info("AGVisibility initialized")
[docs] @staticmethod def getConfiguration(confFilePath, outputDir, logFile, step=30, timeType="tt", tmin="null", tmax="null", fermiSpacecraftFile=None, coord1="null", coord2="null", frame="icrs", userName="my_name", sourceName="vis-source", verboselvl=0 ): """Utility method to create a configuration file. Args: confFilePath (str): the path and filename of the configuration file that is going to be created. outputDir (str): the path to the output directory. The output directory will be created using the following format: 'userName_sourceName_todaydate'. logFile (str): the path to the AGILE logfile containing spacecraft information. fermiSpacecraftFile (str): the path to the Fermi spacecraft file. Optional. step (float): time interval in seconds between 2 consecutive points in the resulting table. Minimum accepted value: 0.1. Default is 30. timeType (str): the time format of tmin and tmax. tmin, tmax (float): inferior, superior observation time limit to analize. coord1, coord2 (float): the coordinates of the source used to compute the offaxis angle. frame (str): the reference frame of the coordinates. userName (str): the username of who is running the software. sourceName (str): the name of the source. verboselvl (int): the verbosity level of the console output. Message types: level 0 => critical, warning, level 1 => critical, warning, info, level 2 => critical, warning, info, debug. Returns: None """ configuration = f""" output: outdir: \"{outputDir}\" filenameprefix: \"visibility_product\" sourcename: \"{sourceName}\" username: \"{userName}\" verboselvl: {verboselvl} input: logfile: \"{logFile}\" fermi_spacecraft_file: {fermiSpacecraftFile} selection: step: {step} timetype: \"{timeType}\" tmin: {tmin} tmax: {tmax} source: coord1: {coord1} coord2: {coord2} frame: \"{frame}\" """ full_file_path = Utils._expandEnvVar(confFilePath) parent_directory = Path(full_file_path).absolute().parent parent_directory.mkdir(exist_ok=True, parents=True) with open(full_file_path,"w") as cf: cf.write(configuration) return None
@property def agileVisibility(self): return self._agileTable @property def fermiVisibility(self): return self._fermiTable
[docs] def computePointingDirection(self, logfilesIndex=None, tmin=None, tmax=None, timetype=None, step=None, src_x=None, src_y=None, frame=None, writeFiles=True): """ Compute the AGILE pointing direction and angular separation from a source in the sky. Values which are None are retreived from the configuration file. The pointing direction is extracted from the ATTITUDE_RA_Y and ATTITUDE_DEC_Y columns of the log files. Args: logfilesIndex (str): the index file for the logfiles wiith spacecraft information. tmin, tmax (float): inferior, superior observation time limit to analize. timetype (str): the time format of tmin and tmax. step (integer): time interval in seconds between 2 consecutive points the table. Minimum accepted value: 0.1. src_x, src_y (float): the coordinates of the source used to compute the offaxis angle, in degrees. frame (str): the reference frame of the coordinates. writeFiles (bool): if True, write the visibility table. Returns: visibility_tab (astropy.table.Table): the table with the AGILE pointing direction and source separation in degrees. """ # If arguments are not provided explicitly, set them from the configuration logfilesIndex = logfilesIndex if logfilesIndex is not None else self.config.getOptionValue("logfile") if not os.path.isfile(logfilesIndex): raise FileNotFoundError tmin = tmin if tmin is not None else self.config.getOptionValue("tmin") tmax = tmax if tmax is not None else self.config.getOptionValue("tmax") timetype = timetype if timetype is not None else self.config.getOptionValue("timetype") if timetype != "TT": tmin = AstroUtils.convert_time_to_agile_seconds(Time(tmin, format=timetype)) tmax = AstroUtils.convert_time_to_agile_seconds(Time(tmin, format=timetype)) step = step if step is not None else self.config.getOptionValue("step") if step < 0.1: self.logger.critical(f"step {step} cannot be < 0.1") raise ValueError(f"\'step\' {step} cannot be < 0.1") src_x = src_x if src_x is not None else self.config.getOptionValue("coord1") src_y = src_y if src_y is not None else self.config.getOptionValue("coord2") frame = frame if frame is not None else self.config.getOptionValue("frame") ########################################################## # Get the log files covering the time interval logFiles = self._getLogsFileInInterval(logfilesIndex, tmin, tmax) self.logger.info(f"Found {len(logFiles)} log files satisfying the interval {tmin}-{tmax}") if not logFiles: self.logger.warning(f"No log files are compatible with tmin {tmin} and tmax {tmax}") return Table() total = len(logFiles) # Compute the Pointing Direction of AGILE self.logger.info(f"Computing AGILE pointing direction. Please wait..") all_time_start = [] all_time_stop = [] all_agile_ra = [] all_agile_dec = [] # Compute logfile by logfile for idx, logFile in enumerate(logFiles): idx+= 1 self.logger.info(f" {idx}/{total} {logFile}") doTimeMask = (idx == 1)or(idx == total) # True for first and last file. list_time_start, list_time_stop, list_agile_ra, list_agile_dec = self._computePointingPerFile(doTimeMask, logFile, tmin, tmax, step) # Append results all_time_start.append(list_time_start) all_time_stop.append(list_time_stop) all_agile_ra.append(list_agile_ra) all_agile_dec.append(list_agile_dec) # Concatenate results into numpy arrays all_time_start = np.concatenate(all_time_start) all_time_stop = np.concatenate(all_time_stop) all_agile_ra = np.concatenate(all_agile_ra) all_agile_dec = np.concatenate(all_agile_dec) # Convert to Astropy Table visibility_tab = Table( [all_time_start, all_time_stop, all_agile_ra, all_agile_dec], names=('TT_start', 'TT_stop', 'AGILE_RA', 'AGILE_DEC'), meta = {} ) visibility_tab['MJD_start'] = AstroUtils.convert_time_from_agile_seconds(visibility_tab['TT_start'].data).mjd visibility_tab['MJD_stop' ] = AstroUtils.convert_time_from_agile_seconds(visibility_tab['TT_stop' ].data).mjd # Add Source offaxis angle column try: target = SkyCoord(src_x, src_y, frame=frame, unit="deg") pointings = SkyCoord(ra=all_agile_ra, dec=all_agile_dec, frame="icrs", unit="deg") separations= target.separation(pointings) visibility_tab['offaxis_angle_deg'] = separations.to("deg").value visibility_tab.meta["source_ra_deg"] = target.ra.deg visibility_tab.meta["source_dec_deg"]= target.dec.deg self.logger.info(f"Computed the source offaxis angle from the AGILE pointing direction for a source at RA={src_x}, DEC={src_y}, frame={frame}") except Exception as e: self.logger.error(f"Cannot compute the pointing-target distance: {e}") # Set variable self._agileTable = visibility_tab # Write file if writeFiles: self.logger.info(f"Writing Visibility table in: {self.outdir}") output_directory = Path(self.outdir).absolute().joinpath("offaxis_data") output_directory.mkdir(exist_ok=True, parents=True) filenamePath = output_directory.joinpath(f"offaxis_distances_{tmin}_{tmax}") visibility_tab.write(filenamePath.with_suffix(".csv"), format="csv", overwrite=True) self.logger.info(f"Produced: {filenamePath}") self.logger.info(f"Done.") return visibility_tab
def _computePointingPerFile(self, doTimeMask, logFile, tmin_start, tmax_start, step): """Compute the AGILE pointing direction for a single log file. Args: logFile (str): path to the logfile with spacecraft data. doTimeMask (bool): determine if a time mask must be applied to filter out the data according to the times set in the analysis. tmin_start, tmax_start (float): start and stop time for the time mask. step (float): filtering step in seconds. Returns: list_time_start,list_time_stop,list_agile_ra,list_agile_dec (list(float)): lists of the start and stop times and the satellite pointing direction. """ logFile = Utils._expandEnvVar(logFile) with fits.open(logFile) as hdulist: SC = hdulist[1].data self.logger.debug(f"Total events: {len(SC['TIME'])}") self.logger.debug(f"tmin: {tmin_start}") self.logger.debug(f"tmin log file: {SC['TIME'][0]}") self.logger.debug(f"tmax: {tmax_start}") self.logger.debug(f"tmax log file: {SC['TIME'][-1]}") self.logger.debug(f"Do time mask? {doTimeMask}") if doTimeMask: self.logger.debug(f"How many times are >= tmin_start? {np.sum(SC['TIME'] >= tmin_start)}") self.logger.debug(f"How many times are <= tmax_start? {np.sum(SC['TIME'] <= tmax_start)}") # Filtering out booleanMask = np.logical_and(SC['TIME'] >= tmin_start, SC['TIME'] <= tmax_start) AGILE_TIME = SC['TIME'][booleanMask] ATTITUDE_RA_Y = SC['ATTITUDE_RA_Y'][booleanMask] ATTITUDE_DEC_Y= SC['ATTITUDE_DEC_Y'][booleanMask] self.logger.debug(f"Time mask: {np.sum(np.logical_not(booleanMask))} values skipped") else: AGILE_TIME = SC['TIME'] ATTITUDE_RA_Y = SC['ATTITUDE_RA_Y'] ATTITUDE_DEC_Y= SC['ATTITUDE_DEC_Y'] # This is to avoid problems with moments for which the AGILE pointing was set to RA=NaN, DEC=NaN booleanMaskRA = np.logical_not(np.isnan(ATTITUDE_RA_Y)) booleanMaskDEC= np.logical_not(np.isnan(ATTITUDE_DEC_Y)) booleanMaskRADEC = np.logical_or(booleanMaskRA, booleanMaskDEC) AGILE_TIME = AGILE_TIME[booleanMaskRA] ATTITUDE_RA_Y= ATTITUDE_RA_Y[booleanMaskRADEC] ATTITUDE_DEC_Y= ATTITUDE_DEC_Y[booleanMaskRADEC] self.logger.debug(f"Not-null mask RA/DEC (at least one NULL): {np.sum(np.logical_not(booleanMaskRADEC))} values skipped") deltatime = 0.1 # AGILE attitude is collected every 0.1 s index_ti = 0 index_tf = len(AGILE_TIME)-1 self.logger.debug(f"Step is: {step}") indexstep = int(step*10) # if step 0.1, indexstep=1 => all values # if step 1, indexstep=10=> one value on 10 values self.logger.debug(f"indexstep is: {indexstep}") self.logger.debug(f"Number of separations to be computed: {index_tf/indexstep}") # Return results list_time_start= AGILE_TIME[index_ti:index_tf:indexstep] list_time_stop = AGILE_TIME[index_ti:index_tf:indexstep]+deltatime list_agile_ra = ATTITUDE_RA_Y[index_ti:index_tf:indexstep] list_agile_dec = ATTITUDE_DEC_Y[index_ti:index_tf:indexstep] return list_time_start,list_time_stop,list_agile_ra,list_agile_dec def _getLogsFileInInterval(self, logfilesIndex, tmin, tmax): """Select the log files covering the time interval [tmin, tmax]. Args: logfilesIndex (str): Inex file for the log files. tmin, tmax (float): Minimum and maximum time in AGILE seconds (TT). Returns: logsFiles (list(str)): List of log files covering the time interval [tmin, tmax]. """ self.logger.debug( "Selecting files from %s [%d to %d]",logfilesIndex, tmin, tmax) logsFiles = [] with open(logfilesIndex, "r") as lfi: lines = [line.strip() for line in lfi.readlines()] for line in lines: elements = line.split(" ") logFilePath = elements[0] logFileTmin = float(elements[1]) logFileTmax = float(elements[2]) if logFileTmin <= tmax and tmin <= logFileTmax: # print(logFileTmin,",",logFileTmax) logsFiles.append(logFilePath) return logsFiles
[docs] def getFermiPointing(self, fermiSpacecraftFile=None, tmin=None, tmax=None, timetype=None, src_x=None, src_y=None, frame=None, writeFiles=True): """ Compute the Fermi pointing direction and angular separation from a source in the sky. Values which are None are retreived from the configuration file. The pointing direction is extracted from the RA_SCZ and DEC_SCZ columns of the spacecraft file. Args: fermiSpacecraftFile (str): the path to the Fermi spacecraft file. If None, it is retrieved from the configuration file. tmin, tmax (float): inferior, superior observation time limit to analize. timetype (str): the time format of tmin and tmax. src_x, src_y (float): the coordinates of the source used to compute the offaxis angle, in degrees. frame (str): the reference frame of the coordinates. writeFiles (bool): if True, write the visibility table. Returns: visibility_tab (astropy.table.Table): the table with the Fermi pointing direction and source separation in degrees. """ fermiSpacecraftFile = fermiSpacecraftFile if fermiSpacecraftFile is not None else self.config.getOptionValue("fermi_spacecraft_file") if not os.path.isfile(fermiSpacecraftFile): self.logger.error(f"Fermi spacecraft file {fermiSpacecraftFile} not found.") raise FileNotFoundError(f"Fermi spacecraft file {fermiSpacecraftFile} not found.") tmin = tmin if tmin is not None else self.config.getOptionValue("tmin") tmax = tmax if tmax is not None else self.config.getOptionValue("tmax") timetype = timetype if timetype is not None else self.config.getOptionValue("timetype") if timetype != "TT": tmin = AstroUtils.convert_time_to_agile_seconds(Time(tmin, format=timetype)) tmax = AstroUtils.convert_time_to_agile_seconds(Time(tmin, format=timetype)) src_x = src_x if src_x is not None else self.config.getOptionValue("coord1") src_y = src_y if src_y is not None else self.config.getOptionValue("coord2") frame = frame if frame is not None else self.config.getOptionValue("frame") ########################################################## # Read Fermi Spacecraft file visibility_tab = Table.read(fermiSpacecraftFile, hdu="SC_DATA") # Select relevant columns visibility_tab = visibility_tab[["START", "STOP", "RA_SCZ", "DEC_SCZ"]] visibility_tab.rename_column("START", "MET_START") visibility_tab.rename_column("STOP", "MET_STOP") # Select data in the requested time interval tmin_met = AstroUtils.time_agile_to_fermi(tmin) tmax_met = AstroUtils.time_agile_to_fermi(tmax) visibility_tab = visibility_tab[(visibility_tab['MET_START'] >= tmin_met) & (visibility_tab['MET_STOP'] <= tmax_met)] # Time Conversion visibility_tab['TT_start']= AstroUtils.time_fermi_to_agile(visibility_tab['MET_START'].data) visibility_tab['TT_stop'] = AstroUtils.time_fermi_to_agile(visibility_tab['MET_STOP'].data) visibility_tab['MJD_start']= AstroUtils.convert_time_from_fermi_seconds(visibility_tab['MET_START'].data).mjd visibility_tab['MJD_stop'] = AstroUtils.convert_time_from_fermi_seconds(visibility_tab['MET_STOP'].data).mjd # Add Source offaxis angle column try: target = SkyCoord(src_x, src_y, frame=frame, unit="deg") pointings = SkyCoord(ra=visibility_tab['RA_SCZ'], dec=visibility_tab['DEC_SCZ'], frame="icrs", unit="deg") separations= target.separation(pointings) visibility_tab['offaxis_angle_deg'] = separations.to("deg").value visibility_tab.meta["source_ra_deg"] = target.ra.deg visibility_tab.meta["source_dec_deg"]= target.dec.deg self.logger.info(f"Computed the source offaxis angle from the Fermi pointing direction for a source at RA={src_x}, DEC={src_y}, frame={frame}") except Exception as e: self.logger.error(f"Cannot compute the pointing-target distance: {e}") # Set variable self._fermiTable = visibility_tab # Write file if writeFiles: self.logger.info(f"Writing Visibility table in: {self.outdir}") output_directory = Path(self.outdir).absolute().joinpath("offaxis_data") output_directory.mkdir(exist_ok=True, parents=True) filenamePath = output_directory.joinpath(f"fermi_offaxis_distances_{tmin}_{tmax}") visibility_tab.write(filenamePath.with_suffix(".csv"), format="csv", overwrite=True) self.logger.info(f"Produced: {filenamePath}") self.logger.info(f"Done.") return visibility_tab
[docs] def plotVisibility(self, sourceCoordinates=None, maxOffaxis=None, mjd_limits=None, plotFermi=True, showPositionPanels=False, plotHistogram=True, saveImages=True): """ Plot the offaxis angle of a source from the AGILE and Fermi pointing directions as a function of time, and the histogram of the offaxis angles. Parameters: sourceCoordinates (astropy.coordinates.SkyCoord): the coordinates of the source used to compute the offaxis angle, If None, the offaxis angle must already be computed in the visibility table. maxOffaxis (float): maximum offaxis angle in degrees to be shown in the visibility plot. mjd_limits (tuple(float,float), optional): Plot limits in MJD. plotFermi (bool): if True, plot the Fermi offaxis angle distribution. plotHistogram (bool): if True, plot the histogram of the offaxis angles. saveImage (bool): if True, save the images in the output directory. showPositionPanels (bool): if True, show satellite Direction. Returns: plots (list(str) or None): Plot output paths. """ plots = [] # Get AGILE Data if self._agileTable is None: self.logger.error(f"AGILE Visibility not Computed. Call AGVisibility.computePointingDirection() first.") return plots # Get Offaxis angle if sourceCoordinates is not None: self.logger.warning(f"Overwrite Offaxis Angle, using Source Coordinates: {sourceCoordinates}") pointings = SkyCoord(ra=self._agileTable['AGILE_RA'], dec=self._agileTable['AGILE_DEC'], frame="icrs", unit="deg") separations= sourceCoordinates.separation(pointings) self._agileTable['offaxis_angle_deg'] = separations.to("deg").value self._agileTable.meta["source_ra_deg"] = sourceCoordinates.ra.deg self._agileTable.meta["source_dec_deg"]= sourceCoordinates.dec.deg # Get Fermi if plotFermi: if self._fermiTable is None: self.logger.error(f"Fermi Visibility not Computed. Call AGVisibility.computeFermiPointing() first.") plotFermi=False else: if sourceCoordinates is not None: self.logger.warning(f"Overwrite Offaxis Angle, using Source Coordinates: {sourceCoordinates}") pointings = SkyCoord(ra=self._fermiTable['RA_SCZ'], dec=self._fermiTable['DEC_SCZ'], frame="icrs", unit="deg") separations= sourceCoordinates.separation(pointings) self._fermiTable['offaxis_angle_deg'] = separations.to("deg").value self._fermiTable.meta["source_ra_deg"] = sourceCoordinates.ra.deg self._fermiTable.meta["source_dec_deg"]= sourceCoordinates.dec.deg # Get Data Tables agile_table = self._agileTable fermi_table = self._fermiTable if plotFermi else None source_flag = f"ra{self._agileTable.meta['source_ra_deg']:08.4f}_{self._agileTable.meta['source_dec_deg']:+08.4f}.reg" try: mjd_flag = f"_MJD_{mjd_limits[0]}_{mjd_limits[1]}" except TypeError: mjd_flag = "" # Plot Visibility Time Series filePath = Path(self.outdir).absolute().joinpath(f"plots/offaxis_{source_flag}{mjd_flag}.png") if saveImages else None plot = self.plottingUtils.plotVisibility(agile_table, fermi_table, maxOffaxis, mjd_limits, filePath, showPositionPanels) plots.append(plot) # Plot Visibility Histogram if plotHistogram: filePath = Path(self.outdir).absolute().joinpath(f"plots/offaxis_hist_{source_flag}{mjd_flag}.png") if saveImages else None bins = np.linspace(0,180,19) plot = self.plottingUtils.plotVisibilityHistogram(agile_table, fermi_table, bins, mjd_limits, filePath) plots.append(plot) return plots