Source code for utils.AstroUtils

# DESCRIPTION
#       Agilepy software
#
# NOTICE
#      Any information contained in this software
#      is property of the AGILE TEAM and is strictly
#      private and confidential.
#      Copyright (C) 2005-2020 AGILE Team.
#          Baroncelli Leonardo <leonardo.baroncelli@inaf.it>
#          Addis Antonio <antonio.addis@inaf.it>
#          Bulgarelli Andrea <andrea.bulgarelli@inaf.it>
#          Parmiggiani Nicolò <nicolo.parmiggiani@inaf.it>
#      All rights reserved.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import math
import numpy as np
import pandas as pd
from datetime import datetime
from astropy.time import Time

[docs]class AstroUtils:
[docs] @staticmethod def distance(l1, b1, l2, b2): """ Computes the angular distance between two galatic coordinates. Args: l1 (float): longitude of first coordinate b1 (float): latitude of first coordinate l2 (float): longitude of second coordinate b2 (float): latitude of second coordinate Returns: the angular distance between (l1, b1) and (l2, b2) """ if l1 < 0 or l1 > 360 or l2 < 0 or l2 > 360: return -2 elif b1 < -90 or b1 > 90 or b2 < -90 or b2 > 90: return -2 else: d1 = b1 - b2 d2 = l1 - l2 b11 = math.pi / 2.0 - (b1 * math.pi / 180.0) b21 = math.pi / 2.0 - (b2 * math.pi / 180.0) m4 = math.cos(b11) * math.cos(b21) + math.sin(b11) * math.sin(b21) * math.cos(d2 * math.pi / 180.0) if m4 > 1: m4 = 1 try: return math.acos(m4) * 180.0 / math.pi except Exception as e: return math.sqrt(d1 * d1 + d2 * d2)
@staticmethod def evaluate_erglog_from_flux(E, flux, E1, E2, alpha): """ Evaluate SED erglog at energy E assuming power law with index alpha. Energies E, E1, E2 are in MeV. Args: E Returns: """ @staticmethod def evaluate_erglog_from_flux(flux, E1, E2, alpha): """ Given a flux, computed between E1 (MeV) and E2 (MeV) assuming power law with index alpha, Evaluate SED erglog at energy log center between E1 and E2. Args: flux: in ph/cm2/s E1: in MeV E2: in MeV alpha: power law index Returns: SED erglog at energy log center between E1 and E2 """ # MeV to erg factor=0.00000160217733 # Ecsquared factor Ecsquared=E1*E2 # Reference Energy where evaluate SED log10_E = ( np.log10(E1) + np.log10(E2) ) /2.0 E = np.power(10, log10_E) # Evaluate and return SED return flux*np.power(factor, 3-alpha)*np.power(E,2-alpha)*Ecsquared/(E2-E1)
[docs] @staticmethod def AP_filter(filename, threshold, tstart, tstop, outpath): """ This function filters an aperture photometry file using a threshold value for exposure, it discards the rows lower than threshold and returns a new file merging the continous rows. Args: filename (str): path of the aperture photometry file threshold (float): exposure threshold tstart (float): time start in UTC tstop (float): time stop in UTC Returns: A filtered file result.txt """ data = pd.read_csv(filename, index_col=False, names=["tmin_tt", "tmax_tt", "exp", "cts"], sep=" ") data = data[data["exp"] > threshold] data.sort_values("tmin_tt", inplace=True) data["group"] = (data["tmin_tt"] > data["tmax_tt"].shift()).cumsum() results = data.groupby("group").agg({"tmin_tt":"min", "tmax_tt":"max"}) results.to_csv(str(outpath)+"/result.txt", sep=" ", index=False) data = pd.read_csv(str(outpath)+"/result.txt", sep=" ", header=0) data = data[data["tmin_tt"] > tstart] data = data[data["tmax_tt"] < tstop] product = str(outpath)+"/result"+"_"+str(tstart) + "_" + str(tstop)+".txt" data.to_csv(product, sep=" ", index=False) return product
############################################################################ # # Astropy implementation # # https://docs.astropy.org/en/stable/time/index.html # # The time "format" specifies how an instant of time is represented. # https://docs.astropy.org/en/stable/time/index.html#time-format # # The time "scale" (or time standard) is 'a specification for measuring time: # either the rate at which time passes; or points in time; or both' # https://docs.astropy.org/en/stable/time/index.html#time-scale # # UNIX_AGILE_DELTA = 1072915200 # AGILE_TIME = UNIX_TIME - UNIX_AGILE_DELTA # # Supported formats = ["jd", "mjd", "fits", "iso", "unix", "agile seconds since 2004"] UNIX_AGILE_DELTA = 1072915200 # AGILE_DELTA: 2004-01-01 00:00:00.000 # MJD 53005 UNIX_FERMI_DELTA = 978307200 # FERMI_DELTA: 2001-01-01 00:00:00.000 # MJD 51910 ############################ # Generic Conversion functions @staticmethod def convert_time_to_agile_seconds(t): """Convert an astropy time object to AGILE seconds. Args: t (astropy.time.Time): Time Object. Returns: agile_time (float): Time in AGILE TT format. """ agile_time = t.unix - AstroUtils.UNIX_AGILE_DELTA return agile_time @staticmethod def convert_time_from_agile_seconds(time_agile_seconds): """Convert time from AGILE seconds format to an astropy time object. Args: time_agile_seconds (float): Time in AGILE TT format. Returns: t (astropy.time.Time): Time Object. """ time_unix = np.array(time_agile_seconds) + AstroUtils.UNIX_AGILE_DELTA t = Time(time_unix, format="unix") return t @staticmethod def convert_time_to_fermi_seconds(t): """Convert an astropy time object to FERMI seconds. Args: t (astropy.time.Time): Time Object. Returns: fermi_time (float): Time in Fermi MET format. """ fermi_time = t.unix - AstroUtils.UNIX_FERMI_DELTA return fermi_time @staticmethod def convert_time_from_fermi_seconds(time_fermi_seconds): """Convert time from Fermi seconds format to an astropy time object. Args: time_fermi_seconds (float): Time in Fermi MET format. Returns: t (astropy.time.Time): Time Object. """ time_unix = np.array(time_fermi_seconds) + AstroUtils.UNIX_FERMI_DELTA t = Time(time_unix, format="unix") return t @staticmethod def time_fermi_to_agile(fermi_time): """Convert Fermi MET (s since 2001-01-01) to AGILE time (s since 2004-01-01). Args: fermi_time (float): Time in Fermi MET format. Returns: agile_time (float): Time in AGILE TT format. """ AGILE_OFFSET_FROM_FERMI = AstroUtils.UNIX_AGILE_DELTA - AstroUtils.UNIX_FERMI_DELTA agile_time = fermi_time - AGILE_OFFSET_FROM_FERMI return agile_time @staticmethod def time_agile_to_fermi(agile_time): """Convert AGILE time (s since 2004-01-01) to Fermi MET (s since 2001-01-01). Args: agile_time (float): Time in AGILE TT format. Returns: fermi_time (float): Time in Fermi MET format. """ AGILE_OFFSET_FROM_FERMI = AstroUtils.UNIX_AGILE_DELTA - AstroUtils.UNIX_FERMI_DELTA fermi_time = agile_time + AGILE_OFFSET_FROM_FERMI return fermi_time ############################ # Input: JD @staticmethod def time_jd_to_mjd(time_jd): t = Time(time_jd, format="jd") return t.mjd @staticmethod def time_jd_to_fits(time_jd): t = Time(time_jd, format="jd") return t.fits @staticmethod def time_jd_to_iso(time_jd): t = Time(time_jd, format="jd") return t.iso @staticmethod def time_jd_to_unix(time_jd): t = Time(time_jd, format="jd") return np.round(t.unix) @staticmethod def time_jd_to_agile_seconds(time_jd): t = Time(time_jd, format="jd") return np.round(t.unix - AstroUtils.UNIX_AGILE_DELTA) ############################ # Input: AGILE SECONDS @staticmethod def time_agile_seconds_to_jd(time_agile_seconds): time_unix = time_agile_seconds + AstroUtils.UNIX_AGILE_DELTA t = Time(time_unix, format="unix") return t.jd @staticmethod def time_agile_seconds_to_fits(time_agile_seconds): time_unix = time_agile_seconds + AstroUtils.UNIX_AGILE_DELTA t = Time(time_unix, format="unix") return t.fits @staticmethod def time_agile_seconds_to_iso(time_agile_seconds): time_unix = time_agile_seconds + AstroUtils.UNIX_AGILE_DELTA t = Time(time_unix, format="unix") return t.iso @staticmethod def time_agile_seconds_to_unix(time_agile_seconds): time_unix = np.array(time_agile_seconds) + AstroUtils.UNIX_AGILE_DELTA return np.round(time_unix) @staticmethod def time_agile_seconds_to_mjd(time_agile_seconds): time_unix = np.array(time_agile_seconds) + AstroUtils.UNIX_AGILE_DELTA t = Time(time_unix, format="unix") return t.mjd ############################ # Input: MJD @staticmethod def time_mjd_to_jd(time_mjd): t = Time(time_mjd, format="mjd") return t.jd @staticmethod def time_mjd_to_fits(time_mjd): t = Time(time_mjd, format="mjd") return t.fits @staticmethod def time_mjd_to_iso(time_mjd): t = Time(time_mjd, format="mjd") return t.iso @staticmethod def time_mjd_to_unix(time_mjd): t = Time(time_mjd, format="mjd") return np.round(t.unix) @staticmethod def time_mjd_to_agile_seconds(time_mjd): t = Time(time_mjd, format="mjd") return np.round(t.unix - AstroUtils.UNIX_AGILE_DELTA) ############################ # Input: ISO @staticmethod def time_iso_to_jd(time_iso): t = Time(time_iso, format="iso") return t.jd @staticmethod def time_iso_to_fits(time_iso): t = Time(time_iso, format="iso") return t.fits @staticmethod def time_iso_to_unix(time_iso): t = Time(time_iso, format="iso") return np.round(t.unix) @staticmethod def time_iso_to_agile_seconds(time_iso): t = Time(time_iso, format="iso") return np.round(t.unix - AstroUtils.UNIX_AGILE_DELTA) @staticmethod def time_iso_to_mjd(time_iso): t = Time(time_iso, format="iso") return t.mjd ############################ # Input: FITS @staticmethod def time_fits_to_jd(time_fits): t = Time(time_fits, format="fits") return t.jd @staticmethod def time_fits_to_mjd(time_fits): t = Time(time_fits, format="fits") return t.mjd @staticmethod def time_fits_to_iso(time_fits): t = Time(time_fits, format="fits") return t.iso @staticmethod def time_fits_to_unix(time_fits): t = Time(time_fits, format="fits") return np.round(t.unix) @staticmethod def time_fits_to_agile_seconds(time_fits): t = Time(time_fits, format="fits") return np.round(t.unix - AstroUtils.UNIX_AGILE_DELTA) ############################ # Input: UNIX @staticmethod def time_unix_to_jd(time_unix): t = Time(time_unix, format="unix") return t.jd @staticmethod def time_unix_to_mjd(time_unix): t = Time(time_unix, format="unix") return t.mjd @staticmethod def time_unix_to_fits(time_unix): t = Time(time_unix, format="unix") return t.fits @staticmethod def time_unix_to_iso(time_unix): t = Time(time_unix, format="unix") return t.iso @staticmethod def time_unix_to_agile_seconds(time_unix): t = Time(time_unix, format="unix") return np.round(t.unix - AstroUtils.UNIX_AGILE_DELTA) ############### @staticmethod def li_ma(n_on:int, n_off:int, alpha:float): """Compute Li&Ma Significance if enough counts are provided. Args: n_on (int): ON counts. n_off (int): OFF counts. alpha (float): Exposure Ratio ON / OFF region. Returns: significance (float): Li&Ma Significance """ if n_on < 10 or n_off < 10 or alpha == 0: return 0.0 fc = (1 + alpha) / alpha fb = n_on / (n_on + n_off) f = fc * fb gc = 1 + alpha gb = n_off / (n_on + n_off) g = gc * gb first = n_on * np.log(f) second = n_off * np.log(g) fullb = first + second significance = np.sqrt(2) * np.sqrt(fullb) return significance