diff --git a/docs/change_history.rst b/docs/change_history.rst index cbfa731f..a4c6281b 100644 --- a/docs/change_history.rst +++ b/docs/change_history.rst @@ -11,6 +11,8 @@ V1.3.0 Unreleased + `--max-targets` was not being used, missed connection in `MainApp`. - Updated keyword values of all reference lamps in the library according to [#292] +- Refactored `wavelength.WavelengthCalibration` class moving several methods to + `core` [#300, #303] .. _v1.2.1: diff --git a/environment.yml b/environment.yml index a862aa21..1b7c8645 100644 --- a/environment.yml +++ b/environment.yml @@ -13,5 +13,6 @@ dependencies: - ccdproc>=1.3.0.post1 - coveralls - astroplan + - mock - sphinx - sphinxcontrib.napoleon diff --git a/goodman_pipeline/core/core.py b/goodman_pipeline/core/core.py index d5fb5425..6cada85c 100644 --- a/goodman_pipeline/core/core.py +++ b/goodman_pipeline/core/core.py @@ -508,7 +508,6 @@ def create_master_flats(flat_files, log.info('Creating Master Flat') for flat_file in flat_files: - # print(f_file) image_full_path = os.path.join(raw_data, flat_file) ccd = read_fits(image_full_path, technique=technique) log.debug('Loading flat image: ' + image_full_path) @@ -560,7 +559,6 @@ def create_master_flats(flat_files, log.info('Created Master Flat: ' + master_flat_name) return master_flat, master_flat_name - # print(master_flat_name) else: log.error('Empty flat list. Check that they do not exceed the ' 'saturation limit.') @@ -568,7 +566,7 @@ def create_master_flats(flat_files, def cross_correlation(reference, - new_array, + compared, slit_size, serial_binning, mode='full', @@ -585,7 +583,7 @@ def cross_correlation(reference, Args: reference (array): Reference array. - new_array (array): Array to be matched. A new reference lamp. + compared (array): Array to be matched. A new reference lamp. slit_size (float): Slit width in arcseconds serial_binning (int): Binning in the spectral axis mode (str): Correlation mode for `scipy.signal.correlate`. @@ -595,7 +593,7 @@ def cross_correlation(reference, correlation_value (int): Shift value in pixels. """ - cyaxis2 = new_array + cyaxis2 = compared if slit_size > 3: box_width = slit_size / (0.15 * serial_binning) @@ -612,7 +610,7 @@ def cross_correlation(reference, gaussian_kernel = Gaussian1DKernel(stddev=kernel_stddev) cyaxis1 = convolve(reference, gaussian_kernel) - cyaxis2 = convolve(new_array, gaussian_kernel) + cyaxis2 = convolve(compared, gaussian_kernel) ccorr = signal.correlate(cyaxis1, cyaxis2, mode=mode) @@ -623,7 +621,7 @@ def cross_correlation(reference, len(ccorr)) correlation_value = x_ccorr[max_index] - if plot: + if plot: # pragma: no cover plt.ion() plt.title('Cross Correlation') plt.xlabel('Lag Value') @@ -835,9 +833,6 @@ def combine_data(image_list, dest_path, prefix=None, output_name=None, field) combined_base_name += "{:s}_".format(value) - - # print(combined_base_name) - number = len(glob.glob( os.path.join(dest_path, combined_base_name + "*.fits"))) @@ -936,8 +931,6 @@ def dcr_cosmicray_rejection(data_path, in_file, prefix, dcr_par_dir, log.debug('DCR command:') log.debug(command) - # print(command.split(' ')) - # get the current working directory to go back to it later in case the # the pipeline has not been called from the same data directory. cwd = os.getcwd() @@ -945,19 +938,6 @@ def dcr_cosmicray_rejection(data_path, in_file, prefix, dcr_par_dir, # move to the directory were the data is, dcr is expecting a file dcr.par os.chdir(data_path) - # check if file dcr.par exists - while not os.path.isfile('dcr.par'): - - log.warning('File dcr.par does not exist. Copying default one.') - dcr_par_path = os.path.join(dcr_par_dir, 'dcr.par') - log.debug('dcr.par full path: {:s}'.format(dcr_par_path)) - if os.path.isfile(dcr_par_path): - shutil.copy2(dcr_par_path, data_path) - else: - log.error('Could not find dcr.par file') - else: - log.debug('File dcr.par exists.') - # call dcr try: @@ -975,7 +955,7 @@ def dcr_cosmicray_rejection(data_path, in_file, prefix, dcr_par_dir, # if the process is taking too long to respond, kill it # kill_process = lambda process: process.kill() - def kill_process(process): + def kill_process(process): # pragma: no cover log.error("DCR Timed out") process.kill() @@ -1214,10 +1194,6 @@ def extract_fractional_pixel(ccd, target_trace, target_fwhm, extraction_width, # this defines the extraction limit for every column low_limit = trace_points[i] - 0.5 * extraction_width * target_fwhm high_limit = trace_points[i] + 0.5 * extraction_width * target_fwhm - # print(trace_points[i], extraction_width, target_stddev) - - # low_limits_list.append(low_limit) - # high_limits_list.append(high_limit) if apnum1 is None: # TODO (simon): add secondary targets @@ -1258,12 +1234,10 @@ def extract_fractional_pixel(ccd, target_trace, target_fwhm, extraction_width, # low_background_zone high_1 = low_limit - background_spacing * target_fwhm low_1 = high_1 - background_width - # print(low_1,high_1) # High background zone low_2 = high_limit + background_spacing * target_fwhm high_2 = low_2 + background_width - # print(low_1, ' to ', high_1, ':', low_2, ' to ', high_2,) # validate background subtraction zones background_1 = None @@ -1281,7 +1255,7 @@ def extract_fractional_pixel(ccd, target_trace, target_fwhm, extraction_width, low_limit=low_1, high_limit=high_1) else: - print("Invalid Zone 1") + log.error("Invalid Zone 1: [{}:{}]".format(low_1, high_1)) if high_2 < spat_length and neighbouring_target_condition: background_2 = fractional_sum(data=ccd.data, @@ -1289,7 +1263,7 @@ def extract_fractional_pixel(ccd, target_trace, target_fwhm, extraction_width, low_limit=low_2, high_limit=high_2) else: - print("Invalid Zone 2") + log.error("Invalid Zone 2: [{}:{}]".format(low_2, high_2)) # background = 0 if background_1 is not None and background_2 is None: @@ -1425,7 +1399,6 @@ def fractional_sum(data, index, low_limit, high_limit): data[int(low_integer), index] * low_fraction + \ data[int(high_integer), index] * high_fraction - # print(low_limit, high_limit, column_sum) return column_sum @@ -1529,7 +1502,6 @@ def get_lines_in_lamp(ccd, plots=False): """ if isinstance(ccd, CCDData): - # print(ccddata_lamp.data.shape) lamp_data = ccd.data lamp_header = ccd.header raw_pixel_axis = range(len(lamp_data)) @@ -1558,7 +1530,6 @@ def get_lines_in_lamp(ccd, plots=False): new_order = int(round(float(slit_size) / (0.15 * serial_binning))) log.debug('New Order: {:d}'.format(new_order)) - # print(round(new_order)) peaks = signal.argrelmax(filtered_data, axis=0, order=new_order)[0] if slit_size >= 5.: @@ -1572,10 +1543,8 @@ def get_lines_in_lamp(ccd, plots=False): lines_center = recenter_lines(no_nan_lamp_data, peaks) if plots: # pragma: no cover - # print(new_order, slit_size, ) plt.close('all') fig, ax = plt.subplots() - # ax = fig.add_subplot(111) fig.canvas.set_window_title('Lines Detected') mng = plt.get_current_fig_manager() @@ -2149,15 +2118,11 @@ def linearize_spectrum(data, wavelength_solution, plots=False): and the smoothed linearized data itself. """ - # for data_point in data: - # print(data_point) - # print('data ', data) pixel_axis = range(len(data)) - if np.nan in data: + if any(np.isnan(data)): log.error("there are nans") sys.exit(0) - # print(pixel_axis) if wavelength_solution is not None: x_axis = wavelength_solution(pixel_axis) try: @@ -2172,7 +2137,6 @@ def linearize_spectrum(data, wavelength_solution, plots=False): der=0) smoothed_linearized_data = signal.medfilt(linearized_data) - # print('sl ', smoothed_linearized_data) if plots: # pragma: no cover fig6 = plt.figure(6) plt.xlabel('Wavelength (Angstrom)') @@ -2271,21 +2235,17 @@ def name_master_flats(header, date_obs = datetime.datetime.strptime(header['DATE-OBS'], "%Y-%m-%dT%H:%M:%S.%f") - # print(sunset, date_obs, evening_twilight) - # print(' ') if target_name != '': target_name = '_' + target_name if not get: # TODO (simon): There must be a pythonic way to do this - if (date_obs < sunset) or (date_obs > sunrise): - dome_sky = '_dome' + if afternoon_twilight < date_obs < morning_twilight: + dome_sky = '_night' elif (sunset < date_obs < afternoon_twilight) or \ (morning_twilight < date_obs < sunrise): dome_sky = '_sky' - elif afternoon_twilight < date_obs < morning_twilight: - dome_sky = '_night' else: - dome_sky = '_other' + dome_sky = '_dome' else: dome_sky = '*' @@ -2302,7 +2262,7 @@ def name_master_flats(header, flat_grating = '_no_grating' wavmode = '' - flat_slit = re.sub('[A-Za-z" ]', + flat_slit = re.sub('[A-Za-z_ ]', '', header['SLIT']) @@ -2322,9 +2282,10 @@ def name_master_flats(header, + '.fits' elif technique == 'Imaging': - flat_filter = re.sub('-', '_', header['FILTER']) + flat_filter = re.sub('[- ]', '_', header['FILTER']) + flat_filter = re.sub('[<> ]', '', flat_filter) master_flat_name += '_' + flat_filter + dome_sky + '.fits' - # print(master_flat_name) + return master_flat_name @@ -2868,7 +2829,6 @@ def search_comp_group(object_group, comp_groups, reference_data): (comp_group['filter2'] == object_confs.iloc[0]['filter2'] )).all(): if reference_data.check_comp_group(comp_group) is not None: - # print(comp_group) log.debug('Found a matching comparison lamp group') return comp_group @@ -3008,28 +2968,24 @@ def trace(ccd, lower_limit_list.append(lower_limit) upper_limit_list.append(upper_limit) - # print(sample_center, nsigmas, model_fwhm, lower_limit, upper_limit) - sample = ccd.data[lower_limit:upper_limit, point:point + sampling_step] sample_median = np.median(sample, axis=1) try: sample_peak = np.argmax(sample_median) - # print(sample_peak + lower_limit) except ValueError: # pragma: no cover - # plt.plot(model(range(spatial_length))) - # plt.plot(ccd.data[:,point]) - # plt.show() - print('Nfwhm ', nfwhm) - print('Model Stddev ', model_fwhm) - print('sample_center ', sample_center) - print('sample ', sample) - print('sample_median ', sample_median) - print('lower_limit ', lower_limit) - print('upper_limit ', upper_limit) - print('point ', point) - print('point + sampling_step ', point + sampling_step) - print(spatial_length, dispersion_length) + log.error('Nfwhm {}'.format(nfwhm)) + log.error('Model Stddev {}'.format(model_fwhm)) + log.error('sample_center {}'.format(sample_center)) + log.error('sample {}'.format(sample)) + log.error('sample_median {}'.format(sample_median)) + log.error('lower_limit {}'.format(lower_limit)) + log.error('upper_limit {}'.format(upper_limit)) + log.error('point {}'.format(point)) + log.error('point + sampling_step {}'.format(point + sampling_step)) + log.error("Spatial length: {}, Dispersion length {}".format( + spatial_length, + dispersion_length)) sys.exit() sample_values.append(sample_peak + lower_limit) @@ -3208,8 +3164,6 @@ def trace_targets(ccd, target_list, sampling_step=5, pol_deg=2, nfwhm=5, sampling_step=sampling_step, nfwhm=nfwhm, plots=plots) - # print(single_trace) - # print(trace_rms) if 0 < single_trace.c0.value < ccd.shape[0]: log.debug('Adding trace to list') @@ -3463,7 +3417,6 @@ def __repr__(self): self.full_path, self.instrument, self.technique)) - # print(class_info) if all([self.gain, self.rdnoise, self.roi]): class_info += str("\nGain: {:.2f}\n" @@ -3636,7 +3589,6 @@ def add_spec_group(self, spec_group): self.is_empty = False comp_group = spec_group[spec_group.obstype == 'COMP'] self.add_comp_group(comp_group=comp_group) - # print(comp_group) def set_sun_times(self, sun_set, sun_rise): """Sets values for sunset and sunrise @@ -3722,10 +3674,10 @@ def __init__(self, reference_dir): self.reference_dir = reference_dir reference_collection = ccdproc.ImageFileCollection(self.reference_dir) self.ref_lamp_collection = reference_collection.summary.to_pandas() - # print(self.ref_lamp_collection) self.lines_pixel = None self.lines_angstrom = None self._ccd = None + self.nist = {} self.lamp_status_keywords = [ 'LAMP_HGA', 'LAMP_NE', @@ -3879,7 +3831,6 @@ def check_comp_group(self, comp_group): (comp_group['lamp_ar'] == lamps.iloc[i]['lamp_ar']) & (comp_group['lamp_fe'] == lamps.iloc[i]['lamp_fe']) & (comp_group['lamp_cu'] == lamps.iloc[i]['lamp_cu'])] - # print(new_group.file) return new_group else: self.log.warning("The target's comparison lamps do not have " @@ -3917,39 +3868,18 @@ def _recover_lines(self): angstrom_key, float(self._ccd.header[angstrom_key]))) - def _validate_lines(self): - """Calls all available validation methods - - Notes: - Line existence validation is not being taken into consideration - since the method to prove existence is not fully developed yet. - - Returns: - True if none of the validation fails. - """ - assert len(self.lines_pixel) == len(self.lines_angstrom) - if not self._order_validation(self.lines_pixel): - return False - if not self._order_validation(self.lines_angstrom): - return False - # if not self._validate_line_existence(): - # return False - self._validate_line_existence() - return True - @staticmethod def _order_validation(lines_array): """Checks that the array of lines only increases.""" previous = None for line_value in lines_array: - # print(line_value) if previous is not None: try: assert line_value > previous previous = line_value except AssertionError: - print("Error: Line {:f} is not larger " - "than {:f}".format(line_value, previous)) + log.error("Error: Line {:f} is not larger " + "than {:f}".format(line_value, previous)) return False else: previous = line_value @@ -3972,42 +3902,6 @@ def _load_nist_list(self, **kwargs): 'reference']) self.nist[key] = nist_data - def _validate_line_existence(self): - """Check if a line actually exists in any table of NIST - - Notes: - It does not actually check NIST, it loads six csv tables - from NIST's strong lines for the elements used in lamps: - Ar, Cu, Fe, He, Hg, Ne. It does not work perfect so far - so the it is not checking existence actually but if it finds it - it will get the value at "spectrum" column in NIST tables which - correspond to the source of the line, for instance Ne I, Ar II, etc. - - """ - - lamp_elements = [] - lamp_name = self._ccd.header['OBJECT'] - if len(lamp_name) % 2 == 0: - for element_index in range(0, len(lamp_name), 2): - element = lamp_name[element_index:element_index + 2] - lamp_elements.append(element) - - if self.nist is None: - self.nist = {} - self._load_nist_list() - - self.spectrum = list(self.lines_angstrom) - for i in range(len(self.lines_angstrom)): - for element in lamp_elements: - line_info = self.nist[element][ - self.nist[element].air_wavelength == self.lines_angstrom[i]] - if line_info.empty: - # print(self.lines_angstrom[i], 'no-info') - self.spectrum[i] = '' - else: - self.spectrum[i] = line_info['spectrum'].to_string( - index=False) - class SaturationValues(object): """Contains a complete table of readout modes and 50% half well @@ -4172,7 +4066,6 @@ def __call__(self, camera_targ = str(header['cam_targ']) grating_targ = str(header['grt_targ']) blocking_filter = str(header['filter2']) - # print(grating, camera_targ, grating_targ, blocking_filter) return self.get_mode(grating=grating, camera_targ=camera_targ, @@ -4206,8 +4099,6 @@ def get_mode(self, grating, camera_targ, grating_targ, blocking_filter): string that defines the wavelength mode used """ - - # print(grating, camera_targ, grating_targ) if any(grat == grating for grat in ('1800', '2100', '2400')): central_wavelength = get_central_wavelength(grating=grating, grt_ang=grating_targ, @@ -4766,7 +4657,6 @@ def _fit_gaussian(fitter, ax.plot(spatial_profile, color='k', label='Median Profile') for profile in profile_model: - # print(profile_model) ax.plot(profile(range(len(spatial_profile))), label=profile.name) diff --git a/goodman_pipeline/core/tests/test_check_version.py b/goodman_pipeline/core/tests/test_check_version.py index 76976e86..5bfbbd51 100644 --- a/goodman_pipeline/core/tests/test_check_version.py +++ b/goodman_pipeline/core/tests/test_check_version.py @@ -29,6 +29,13 @@ def test_get_last_no_token(self): except KeyError: # pragma: no cover pass + def test_get_last_token(self): + os.environ['FAKETOKEN'] = 'ThisIsNotARealToken' + self.assertRaises(ConnectionRefusedError, + check_version.get_last, + 'FAKETOKEN') + + def test_am_i_updated(self): try: self.assertTrue(check_version.am_i_updated(__version__)) diff --git a/goodman_pipeline/core/tests/test_core.py b/goodman_pipeline/core/tests/test_core.py index a16a00bc..eefc8c54 100644 --- a/goodman_pipeline/core/tests/test_core.py +++ b/goodman_pipeline/core/tests/test_core.py @@ -9,6 +9,7 @@ fitting) import astropy.units as u import collections +import mock import numpy as np import os import pandas @@ -41,6 +42,7 @@ convert_time, create_master_bias, create_master_flats, + cross_correlation, dcr_cosmicray_rejection, define_trim_section, extraction, @@ -61,6 +63,7 @@ image_trim, interpolate, is_file_saturated, + linearize_spectrum, name_master_flats, normalize_master_flat, ra_dec_to_deg, @@ -74,33 +77,13 @@ validate_ccd_region, write_fits) -# class ExceptionHandling(TestCase): -# -# def test_critical_error(self): -# self.assertRaises(CriticalError) -# -# -# def test_night_data_container(self): -# pass -# -# -# def test_no_match_found(self): -# pass -# -# -# def test_not_enough_lines_detected(self): -# pass -# -# -# def test_no_target_exception(self): -# pass - - -def test_spectroscopic_mode(): - pass - -def test_lacosmic_cosmicray_rejection(): - pass + +def fake_subprocess_popen(*args, stdout, stderr): + raise OSError + + +def fake_dcr_communicate_stderr_no_dcr(): + return b'some message', b'dcr: not found' class AddLinearWavelengthSolutionTest(TestCase): @@ -201,198 +184,24 @@ def test__bin_reference_data(self): self.assertEqual(len(new_wavelength), np.floor(len(wavelength) / i)) -class CrossCorrelationTest(TestCase): - - @skip - def test__cross_correlation(self): - self.wc.lamp = self.ccd.copy() - self.wc.serial_binning = 1 - - x_axis = np.arange(0, 4060, 1) - - reference = np.zeros(4060) - gaussian = models.Gaussian1D(stddev=2) - - for i in sorted(np.random.choice(x_axis, 30)): - gaussian.mean.value = i - reference += gaussian(x_axis) - - offset = np.random.choice(range(1, 15), 1)[0] - - for slit in [1, 2, 3, 4, 5]: - - new_array = np.append(reference[offset:], np.zeros(offset)) - - if slit > 3: - box_kernel = Box1DKernel(width=slit / 0.15) - new_array = convolve(new_array, box_kernel) - - self.assertEqual(len(reference), len(new_array)) - - self.wc.lamp.header['SLIT'] = '{:d}.0" long slit'.format(slit) - - correlation_value = self.wc._cross_correlation(reference=reference, - new_array=new_array) - self.assertEqual(correlation_value, offset) - -class InterpolationTest(TestCase): - - def test_interpolate(self): - initial_array = np.sin(np.arange(0, 3 * np.pi)) - initial_length = len(initial_array) - - new_x_axis, new_array = interpolate(spectrum=initial_array, - interpolation_size=100) - - self.assertEqual(len(new_x_axis), len(new_array)) - self.assertEqual(len(new_array), initial_length * 100) - - -class GenerateDcrFile(TestCase): - - def setUp(self): - self.create = GenerateDcrParFile() - self.ccd = CCDData(data=np.ones((100, 100)), - meta=fits.Header(), - unit='adu') - self.ccd.header.set('INSTCONF', value='Red') - self.ccd.header.set('CCDSUM', value='1 1') - - def test_generate_dcr_par_file(self): - serial, parallel = self.ccd.header['CCDSUM'].split() - instrument = self.ccd.header['INSTCONF'] - - self.assertEqual(serial, '1') - self.assertEqual(instrument, 'Red') - self.assertEqual(self.create._file_name, 'dcr.par') - self.assertIsInstance(self.create._df, pandas.DataFrame) - - self.assertFalse(os.path.isfile(self.create._file_name)) - self.create() - self.assertTrue(os.path.isfile(self.create._file_name)) - - self.assertRaises(AssertionError, self.create, 'Green') - - def tearDown(self): - if os.path.isfile(self.create._file_name): - os.remove(self.create._file_name) - - -class GetLinesInLampTest(TestCase): - - @skip - def test_get_lines_in_lamp(self): - pass - - -class GetSpectralCharacteristicsTest(TestCase): - def setUp(self): - self.ccd = CCDData(data=np.random.random_sample(200), - meta=fits.Header(), - unit='adu') - self.pixel_size = 15 * u.micrometer - self.goodman_focal_length = 377.3 * u.mm - - def test__get_spectral_characteristics(self): - self.ccd.header.set('SLIT', '1.0_LONG_SLIT') - self.ccd.header.set('GRATING', 'SYZY_400') - self.ccd.header.set('GRT_ANG', 7.5) - self.ccd.header.set('CAM_ANG', 16.1) - self.ccd.header.set('CCDSUM', '1 1') - - spec_charact = get_spectral_characteristics( - ccd=self.ccd, - pixel_size=self.pixel_size, - instrument_focal_length=self.goodman_focal_length) - - self.assertIsInstance(spec_charact, dict) - self.assertEqual(len(spec_charact), 7) -class MasterFlatTest(TestCase): +class CentralWavelength(TestCase): def setUp(self): - # create a master flat - self.master_flat = CCDData(data=np.ones((100, 100)), - meta=fits.Header(), - unit='adu') - self.master_flat.header.set('GRATING', value='RALC_1200-BLUE') - self.master_flat.header.set('SLIT', value='0.84" long slit') - self.master_flat.header.set('FILTER2', value='') - self.master_flat.header.set('WAVMODE', value='1200 m2') - self.master_flat_name = 'master_flat_1200m2.fits' - # expected master flat to be retrieved by get_best_flat - self.reference_flat_name = 'master_flat_1200m2_0.84_dome.fits' - # location of sample flats - self.flat_path = 'goodman_pipeline/data/test_data/master_flat' - slit = re.sub('[A-Za-z" ]', - '', - self.master_flat.header['SLIT']) - self.flat_name_base = re.sub('.fits', - '_' + slit + '*.fits', - self.master_flat_name) - - # save a master flat with some random structure. - - self.master_flat_name_norm = 'flat_to_normalize.fits' - # add a bias level - self.master_flat.data += 300. - # add noise - self.master_flat.data += np.random.random_sample( - self.master_flat.data.shape) - - self.master_flat.write(os.path.join(self.flat_path, - self.master_flat_name_norm), - overwrite=False) - - def tearDown(self): - full_path = os.path.join(self.flat_path, - self.master_flat_name_norm) - - self.assertTrue(os.path.isfile(full_path)) - if os.path.isfile(full_path): - os.unlink(full_path) - self.assertFalse(os.path.isfile(full_path)) - - # remove normalized flat - norm_flat = re.sub('flat_to_', 'norm_flat_to_', full_path) - if os.path.isfile(norm_flat): - os.unlink(norm_flat) - self.assertFalse(os.path.isfile(norm_flat)) - - def test_get_best_flat(self): - # print(self.flat_name_base) - - master_flat, master_flat_name = get_best_flat( - flat_name=self.flat_name_base, - path=self.flat_path) - self.assertIsInstance(master_flat, CCDData) - self.assertEqual(os.path.basename(master_flat_name), - self.reference_flat_name) - - def test_get_best_flat_fail(self): - # Introduce an error that will never produce a result. - wrong_flat_name = re.sub('1200m2', '1300m2', self.flat_name_base) - master_flat, master_flat_name = get_best_flat( - flat_name=wrong_flat_name, - path=self.flat_path) - self.assertIsNone(master_flat) - self.assertIsNone(master_flat_name) - - def test_normalize_master_flat(self): - methods = ['mean', 'simple', 'full'] - for method in methods: - self.assertNotAlmostEqual(self.master_flat.data.mean(), 1.) - normalized_flat, normalized_flat_name = normalize_master_flat( - master=self.master_flat, - name=os.path.join(self.flat_path, - self.master_flat_name_norm), - method=method) + # 400m2 + self.grating = '400' + self.grating_angle = 7.5 + self.camera_angle = 16.1 + self.reference_central_wavelength = 7001.54 * u.angstrom - self.assertAlmostEqual(normalized_flat.data.mean(), 1., - delta=0.001) - self.assertEqual(normalized_flat.header['GSP_NORM'], method) - self.assertIn('norm_', normalized_flat_name) + def test_get_central_wavelength(self): + central_wavelength = get_central_wavelength(grating=self.grating, + grt_ang=self.grating_angle, + cam_ang=self.camera_angle) + self.assertAlmostEqual(central_wavelength.value, + self.reference_central_wavelength.value, + places=2) class ClassifySpectroscopicData(TestCase): @@ -475,23 +284,65 @@ def test_classify_spectroscopic_data(self): self.assertFalse(result.is_empty) -class CentralWavelength(TestCase): +class CombineDataTest(TestCase): def setUp(self): - # 400m2 - self.grating = '400' - self.grating_angle = 7.5 - self.camera_angle = 16.1 - self.reference_central_wavelength = 7001.54 * u.angstrom + self.ccd1 = CCDData(data=np.ones((100, 100)), + meta=fits.Header(), + unit='adu') + self.ccd1.header.set('OBJECT', value='TestObject') + self.ccd1.header.set('GRATING', value='Grating') + self.ccd1.header.set('SLIT', value='1.05SlitSize') - def test_get_central_wavelength(self): - central_wavelength = get_central_wavelength(grating=self.grating, - grt_ang=self.grating_angle, - cam_ang=self.camera_angle) - self.assertAlmostEqual(central_wavelength.value, - self.reference_central_wavelength.value, - places=2) + self.ccd2 = self.ccd1.copy() + self.ccd2.data *= 2 + self.ccd3 = self.ccd1.copy() + self.ccd3.data *= 5 + self.ccd1.header.set('GSP_FNAM', value='image_1.fits') + self.ccd2.header.set('GSP_FNAM', value='image_2.fits') + self.ccd3.header.set('GSP_FNAM', value='image_3.fits') + self.image_list = [self.ccd1, self.ccd2, self.ccd3] + self.dest_path = os.getcwd() + self.prefix = 'testing_' + self.output_name = 'combine_data.fits' + + def tearDown(self): + using_output_name_file_name = os.path.join( + self.dest_path, + self.output_name) + if os.path.isfile(using_output_name_file_name): + os.unlink(using_output_name_file_name) + + not_using_output_name_file_name_list = os.listdir(self.dest_path) + if not_using_output_name_file_name_list: + for _file in not_using_output_name_file_name_list: + if '{:s}combined'.format(self.prefix) in _file: + os.unlink(_file) + + def test_combine_data_median_prefix_ignored(self): + combined = combine_data( + image_list=self.image_list, + dest_path=self.dest_path, + prefix=self.prefix, + output_name=self.output_name, + method='median', + save=True) + + np.testing.assert_array_equal(combined.data, np.ones((100, 100)) * 1.5) + self.assertEqual(len(combined.header['GSP_IC*']), 3) + self.assertTrue(self.prefix not in combined.header['GSP_FNAM']) + + def test_combine_data_median_prefix_used(self): + combined = combine_data( + image_list=self.image_list, + dest_path=self.dest_path, + prefix=self.prefix, + method='median', + save=True) + np.testing.assert_array_equal(combined.data, np.ones((100, 100)) * 1.5) + self.assertEqual(len(combined.header['GSP_IC*']), 3) + self.assertTrue(self.prefix in combined.header['GSP_FNAM']) @@ -529,7 +380,7 @@ def test_call_cosmic_rejection_default_1x1(self): keep_files=True, prefix=prefix, method='default', - save=True) + save=False) self.assertAlmostEqual(initial_value, ccd.data[50, 50]) self.assertEqual(out_prefix, prefix + self.out_prefix) self.assertEqual(ccd.header['GSP_FNAM'], @@ -537,12 +388,36 @@ def test_call_cosmic_rejection_default_1x1(self): self.assertEqual(ccd.header['GSP_COSM'], 'DCR') self.assertTrue(os.path.isfile('dcr.par')) - self.assertTrue(os.path.isfile('new_prefixcr_test.fits')) - def test_call_cosmic_rejection_default_2x2(self): - self.ccd.header.set('CCDSUM', value='2 2') - prefix = 'new_' - initial_value = self.ccd.data[50, 50] + @mock.patch('subprocess.Popen', side_effect=fake_subprocess_popen) + def test_call_cosmic_rejection_default_1x1_no_dcr_par(self, + subprocess_Popen_function): + + self.assertRaises(SystemExit, + call_cosmic_rejection, + self.ccd, + self.file_name, + self.out_prefix, + self.red_path, + os.getcwd()) + + @mock.patch('subprocess.Popen.communicate', + side_effect=fake_dcr_communicate_stderr_no_dcr) + def test_dcr_cosmicray_rejection_no_dcr_executable( + self, subprocess_Popen_communicate): + + self.assertRaises(SystemExit, + dcr_cosmicray_rejection, + self.red_path, + self.file_name, + 'c', + os.getcwd()) + + + def test_call_cosmic_rejection_default_2x2(self): + self.ccd.header.set('CCDSUM', value='2 2') + prefix = 'new_' + initial_value = self.ccd.data[50, 50] self.ccd.data[50, 50] = 50000 ccd, out_prefix = call_cosmic_rejection(ccd=self.ccd, @@ -645,88 +520,304 @@ def tearDown(self): os.unlink(_file) -class CombineDataTest(TestCase): +class CreateMasterBias(TestCase): def setUp(self): - self.ccd1 = CCDData(data=np.ones((100, 100)), + self.name = '' + self.name_2 = '' + self.bias_files = ['bias_{}.fits'.format(i) for i in range(1, 12)] + self.raw_data = os.getcwd() + self.reduced_data = os.getcwd() + + self.overscan_region = '[1:10,1:100]' + self.trim_section = '[11:90,11:90]' + + self.ccd = CCDData(data=np.ones((100, 100)), meta=fits.Header(), unit='adu') - self.ccd1.header.set('OBJECT', value='TestObject') - self.ccd1.header.set('GRATING', value='Grating') - self.ccd1.header.set('SLIT', value='1.05SlitSize') - self.ccd2 = self.ccd1.copy() - self.ccd2.data *= 2 - self.ccd3 = self.ccd1.copy() - self.ccd3.data *= 5 - self.ccd1.header.set('GSP_FNAM', value='image_1.fits') - self.ccd2.header.set('GSP_FNAM', value='image_2.fits') - self.ccd3.header.set('GSP_FNAM', value='image_3.fits') - self.image_list = [self.ccd1, self.ccd2, self.ccd3] - self.dest_path = os.getcwd() - self.prefix = 'testing_' - self.output_name = 'combine_data.fits' + self.ccd.data[11:90, 10:90] += 1 + self.ccd.header.set('CCDSUM', value='1 1') + + for _file in self.bias_files: + self.ccd.write(os.path.join(self.reduced_data, _file)) def tearDown(self): - using_output_name_file_name = os.path.join( - self.dest_path, - self.output_name) - if os.path.isfile(using_output_name_file_name): - os.unlink(using_output_name_file_name) + for _file in self.bias_files: + os.unlink(os.path.join(self.reduced_data, _file)) - not_using_output_name_file_name_list = os.listdir(self.dest_path) - if not_using_output_name_file_name_list: - for _file in not_using_output_name_file_name_list: - if '{:s}combined'.format(self.prefix) in _file: - os.unlink(_file) + if self.name != '' and self.name_2 != '': + for _file in [self.name, self.name_2]: + os.unlink(_file) - def test_combine_data_median_prefix_ignored(self): - combined = combine_data( - image_list=self.image_list, - dest_path=self.dest_path, - prefix=self.prefix, - output_name=self.output_name, - method='median', - save=True) + def test_create_master_bias(self): + master, self.name = create_master_bias( + bias_files=self.bias_files, + raw_data=self.raw_data, + reduced_data=self.reduced_data, + technique='Spectroscopy', + overscan_region=self.overscan_region, + trim_section=self.trim_section) - np.testing.assert_array_equal(combined.data, np.ones((100, 100)) * 1.5) - self.assertEqual(len(combined.header['GSP_IC*']), 3) - self.assertTrue(self.prefix not in combined.header['GSP_FNAM']) + self.assertTrue('master_bias' in self.name) - def test_combine_data_median_prefix_used(self): - combined = combine_data( - image_list=self.image_list, - dest_path=self.dest_path, - prefix=self.prefix, - method='median', - save=True) + master_2, self.name_2 = create_master_bias( + bias_files=self.bias_files, + raw_data=self.raw_data, + reduced_data=self.reduced_data, + technique='Spectroscopy', + overscan_region=self.overscan_region, + trim_section=self.trim_section) - np.testing.assert_array_equal(combined.data, np.ones((100, 100)) * 1.5) - self.assertEqual(len(combined.header['GSP_IC*']), 3) - self.assertTrue(self.prefix in combined.header['GSP_FNAM']) + self.assertTrue('master_bias_2.fits' in self.name_2) + self.assertEqual(master.shape, master_2.shape) -class TimeConversionTest(TestCase): + self.assertTrue(all( + [master.header[key] in self.bias_files for key in master.header['GSP_IC*'].keys()])) + + def test_create_master_bias_wrong_ccd_region_syntax(self): + + self.assertRaises(SyntaxError, + create_master_bias, + self.bias_files, + self.raw_data, + self.reduced_data, + 'Spectroscopy', + 'not-a-ccd-region', + 'not-a-region-either') + + +class CreateMasterFlatsTest(TestCase): def setUp(self): - self.test_time_str = '2018-01-17T12:05:44.250' - self.test_time_sec = 1516190744.0 + self.flat_files = ['flat_{}.fits'.format(i) for i in range(1, 12)] + self.raw_data = os.getcwd() + self.reduced_data = os.getcwd() + self.master_flat_name = 'master_flat.fits' + self.saturation_level = 69257 - def test_convert_time(self): - self.assertEqual(convert_time(self.test_time_str), self.test_time_sec) + self.overscan_region = '[1:10,1:100]' + self.trim_section = '[11:90,11:90]' - def test_get_twilight_time(self): - expected_evening_twilight = '2018-01-17T01:21:26.113' - expected_morning_twilight = '2018-01-17T08:24:38.919' - expected_sun_set_time = '2018-01-17T23:43:46.782' - expected_sun_rise_time = '2018-01-17T10:02:04.508' - evening_twilight, morning_twilight, sun_set, sun_rise\ - = get_twilight_time([self.test_time_str]) + self.ccd = CCDData(data=np.ones((100, 100)), + meta=fits.Header(), + unit='adu') - self.assertEqual(evening_twilight, expected_evening_twilight) - self.assertEqual(morning_twilight, expected_morning_twilight) - self.assertEqual(sun_set, expected_sun_set_time) - self.assertEqual(sun_rise, expected_sun_rise_time) + self.ccd.data[11:90, 10:90] += 5 + self.ccd.header.set('CCDSUM', value='1 1') + self.ccd.header.set('INSTCONF', value='Red') + self.ccd.header.set('GAIN', value=1.48) + self.ccd.header.set('RDNOISE', value=3.89) + + for _file in self.flat_files: + self.ccd.write(os.path.join(self.reduced_data, _file)) + + self.bias = CCDData(data=np.ones((80, 80)), + meta=fits.Header(), + unit='adu') + self.bias.header.set('CCDSUM', value='1 1') + self.bias.write(os.path.join(self.reduced_data, 'master_bias.fits')) + + def tearDown(self): + for _file in self.flat_files: + os.unlink(os.path.join(self.reduced_data, _file)) + for _file in ['master_bias.fits', self.master_flat_name]: + if os.path.isfile(_file): + os.unlink(_file) + + def test_create_master_flats_no_bias(self): + master, name = create_master_flats( + flat_files=self.flat_files, + raw_data=self.raw_data, + reduced_data=self.reduced_data, + technique='Spectroscopy', + overscan_region=self.overscan_region, + trim_section=self.trim_section, + master_bias_name='master_bias.fits', + new_master_flat_name=self.master_flat_name, + saturation=1, + ignore_bias=True) + + self.assertEqual(self.master_flat_name, os.path.basename(name)) + self.assertTrue(os.path.isfile(name)) + self.assertEqual('none', master.header['GSP_BIAS']) + + def test_create_master_flats_with_bias(self): + + master, name = create_master_flats( + flat_files=self.flat_files, + raw_data=self.raw_data, + reduced_data=self.reduced_data, + technique='Spectroscopy', + overscan_region=self.overscan_region, + trim_section=self.trim_section, + master_bias_name=os.path.join(self.reduced_data, + 'master_bias.fits'), + new_master_flat_name=os.path.join(self.reduced_data, + self.master_flat_name), + saturation=1, + ignore_bias=False) + + self.assertEqual(self.master_flat_name, os.path.basename(name)) + self.assertTrue(os.path.isfile(name)) + self.assertEqual('master_bias.fits', master.header['GSP_BIAS']) + + + def test_create_master_flats_saturated_flats(self): + + file_to_replace = os.path.join(self.reduced_data, self.flat_files[0]) + os.unlink(file_to_replace) + self.assertFalse(os.path.isfile(file_to_replace)) + self.ccd.data[11:90, 10:90] += 2 * self.saturation_level + self.ccd.header.set('OBJECT', value='saturated') + self.ccd.write(os.path.join(self.reduced_data, self.flat_files[0]), + overwrite=True) + + master, name = create_master_flats( + flat_files=self.flat_files, + raw_data=self.raw_data, + reduced_data=self.reduced_data, + technique='Spectroscopy', + overscan_region=self.overscan_region, + trim_section=self.trim_section, + master_bias_name='master_bias.fits', + new_master_flat_name=self.master_flat_name, + saturation=1, + ignore_bias=False) + + self.assertNotEqual(self.flat_files[0], master.header['GSP_IC01']) + + def test_create_master_flats_empty_list(self): + master, name = create_master_flats( + flat_files=[], + raw_data=self.raw_data, + reduced_data=self.reduced_data, + technique='Spectroscopy', + overscan_region=self.overscan_region, + trim_section=self.trim_section, + master_bias_name='master_bias.fits', + new_master_flat_name=self.master_flat_name, + saturation=1, + ignore_bias=False) + self.assertIsNone(master) + self.assertIsNone(name) + + +class CrossCorrelationTest(TestCase): + + def setUp(self): + self.binning = 1 + self.size = 5000 + self.reference_array = np.ones(self.size) + self.compared_array = np.ones(self.size) + self.x_axis = np.arange(0, self.size, 1) + self.gaussian = models.Gaussian1D(stddev=5, amplitude=3000) + self.gaussian.mean.value = int(self.size / 2.) + self.reference_array += self.gaussian(self.x_axis) + + def test_cross_correlation_small_slit(self): + + offset = 500 + + self.gaussian.mean.value -= offset + + self.compared_array += self.gaussian(self.x_axis) + + correlation_value = cross_correlation(reference=self.reference_array, + compared=self.compared_array, + slit_size=1, + serial_binning=1, + mode='full', + plot=False) + + self.assertEqual(offset, correlation_value) + + def test_cross_correlation_large_slit(self): + offset = 500 + + self.gaussian.mean.value -= offset + + self.compared_array += self.gaussian(self.x_axis) + + box_kernel = Box1DKernel(width=5 / 0.15) + + self.compared_array = convolve(self.compared_array, box_kernel) + + correlation_value = cross_correlation(reference=self.reference_array, + compared=self.compared_array, + slit_size=5, + serial_binning=1, + mode='full', + plot=False) + + self.assertEqual(offset, correlation_value) + + +class DefineTrimSectionTest(TestCase): + + def setUp(self): + self.ccd = CCDData(data=np.ones((100, 100)), + meta=fits.Header(), + unit='adu') + self.ccd.header.set('CCDSUM', value='1 1') + self.ccd.header.set('TRIMSEC', value='[10:10,10:10]') + + self.full_path = os.path.join(os.getcwd(), 'testfile.fits') + + self.ccd.write(self.full_path) + + def test_define_trim_section_spectroscopy(self): + + expected_trim_section = '[51:4110,2:100]' + trim_section = define_trim_section(sample_image=self.full_path, + technique='Spectroscopy') + self.assertEqual(expected_trim_section, trim_section) + + def test_define_trim_section_spectroscopy_2x2(self): + self.ccd.header.set('CCDSUM', value='2 2') + self.ccd.write(self.full_path, overwrite=True) + + expected_trim_section = '[26:2055,2:100]' + trim_section = define_trim_section(sample_image=self.full_path, + technique='Spectroscopy') + self.assertEqual(expected_trim_section, trim_section) + + def test_define_trim_section_imaging(self): + + expected_trim_section = '[10:10,10:10]' + trim_section = define_trim_section(sample_image=self.full_path, + technique='Imaging') + + self.assertEqual(expected_trim_section, trim_section) + + def tearDown(self): + if os.path.isfile(self.full_path): + os.unlink(self.full_path) + +class EvaluateWavelengthSolutionTest(TestCase): + + def test__evaluate_solution(self): + differences = np.array([0.5] * 10) + + clipped_differences = np.ma.masked_array(differences, + mask=[0, + 0, + 1, + 0, + 0, + 1, + 0, + 0, + 1, + 0]) + + rms_error, n_points, n_rej = evaluate_wavelength_solution( + clipped_differences=clipped_differences) + + self.assertEqual(rms_error, 0.5) + self.assertEqual(n_points, 10) + self.assertEqual(n_rej, 3) class ExtractionTest(TestCase): @@ -842,8 +933,6 @@ def test_extraction_not_implemented_model(self): spatial_profile=spatial_profile, extraction_name='fractional') - - def test_extraction_exception(self): self.assertRaises(NotImplementedError, extraction, ccd=self.fake_image, target_trace=self.target_trace, @@ -851,944 +940,1394 @@ def test_extraction_exception(self): extraction_name='optimal') -class EvaluateWavelengthSolutionTest(TestCase): +class FitsFileIOAndOps(TestCase): - def test__evaluate_solution(self): - differences = np.array([0.5] * 10) + def setUp(self): + self.fake_image = CCDData(data=np.ones((100, 100)), + meta=fits.Header(), + unit='adu') - clipped_differences = np.ma.masked_array(differences, - mask=[0, - 0, - 1, - 0, - 0, - 1, - 0, - 0, - 1, - 0]) + self.file_name = 'sample_file.fits' + self.target_non_zero = 4 + self.current_directory = os.getcwd() + self.full_path = os.path.join(self.current_directory, self.file_name) + self.parent_file = 'parent_file.fits' - rms_error, n_points, n_rej = evaluate_wavelength_solution( - clipped_differences=clipped_differences) + self.fake_image.header.set('CCDSUM', + value='1 1', + comment='Fake values') - self.assertEqual(rms_error, 0.5) - self.assertEqual(n_points, 10) - self.assertEqual(n_rej, 3) + self.fake_image.header.set('OBSTYPE', + value='OBJECT', + comment='Fake values') + self.fake_image.header.set('GSP_FNAM', + value=self.file_name, + comment='Fake values') -class FixKeywordsTest(TestCase): + self.fake_image.header.set('GSP_PNAM', + value=self.parent_file, + comment='Fake values') - def setUp(self): - self.ccd = CCDData(data=np.ones((100, 100)), - meta=fits.Header(), - unit='adu') - self.full_path = os.path.join(os.getcwd(), 'sample_file.fits') - self.ccd.write(self.full_path) + self.fake_image.write(self.full_path, overwrite=False) - def tearDown(self): - if os.path.isfile(self.full_path): - os.unlink(self.full_path) + def test_write_fits(self): + self.assertTrue(os.path.isfile(self.full_path)) + os.remove(self.full_path) + write_fits(ccd=self.fake_image, + full_path=self.full_path, + parent_file=self.parent_file, + overwrite=False) + self.assertTrue(os.path.isfile(self.full_path)) - def test_fix_keywords(self): - # not really testing anything here - fix_keywords(path=os.getcwd(), pattern="*.fits") + def test_write_fits_directory_does_not_exist(self): + os.unlink(self.full_path) -class SlitTrimTest(TestCase): - # TODO (simon): discuss with Bruno + base_path = os.path.dirname(self.full_path) + self.full_path = os.path.join(base_path, + 'testdir', + os.path.basename(self.full_path)) + self.assertFalse(os.path.isdir(os.path.dirname(self.full_path))) + write_fits(ccd=self.fake_image, + full_path=self.full_path, + parent_file=self.parent_file, + overwrite=True) + self.assertTrue(os.path.isdir(os.path.dirname(self.full_path))) - def setUp(self): - # Create fake image - self.fake_image = CCDData(data=np.ones((100, 100)), - meta=fits.Header(), - unit='adu') - # define - self.slit_low_limit = 5 - self.slit_high_limit = 95 + def test_read_fits(self): + self.fake_image.header.remove('GSP_PNAM') + self.fake_image.write(self.full_path, overwrite=True) - self.reference_slit_trim = '[1:100,{:d}:{:d}]'.format( - self.slit_low_limit + 10 + 1, - self.slit_high_limit - 10) + self.recovered_fake_image = read_fits(self.full_path) + self.assertIsInstance(self.recovered_fake_image, CCDData) - # make a flat-like structure - self.fake_image.data[self.slit_low_limit:self.slit_high_limit, :] = 100 + def test_image_overscan(self): + data_value = 100. + overscan_value = 0.1 + # alter overscan region to a lower number + self.fake_image.data *= data_value + self.fake_image.data[:, 0:5] = overscan_value - def test_get_slit_trim_section__slit_within_data(self): + overscan_region = '[1:6,:]' + self.assertEqual(self.fake_image.data[:, 6:99].mean(), data_value) + self.assertEqual(self.fake_image.data[:, 0:5].mean(), overscan_value) + self.fake_image = image_overscan(ccd=self.fake_image, + overscan_region=overscan_region) - slit_trim = get_slit_trim_section(master_flat=self.fake_image) - # print(fake_image.data[:,5]) - # print(slit_trim) - self.assertEqual(slit_trim, self.reference_slit_trim) + self.assertEqual(self.fake_image.data[:, 6:99].mean(), + data_value - overscan_value) + self.assertEqual(self.fake_image.header['GSP_OVER'], overscan_region) - def test_get_slit_trim_section__slit_full_data(self): - self.fake_image.data[:, :] = 100 + def test_image_overscan_none(self): + new_fake_image = image_overscan(ccd=self.fake_image, + overscan_region=None) + self.assertEqual(new_fake_image, self.fake_image) - slit_trim = get_slit_trim_section(master_flat=self.fake_image) - # print(fake_image.data[:,5]) - self.assertEqual(slit_trim, '[1:100,1:100]') + def test_image_trim(self): + self.assertEqual(self.fake_image.data.shape, (100, 100)) + trim_section = '[1:50,:]' + self.fake_image = image_trim(ccd=self.fake_image, + trim_section=trim_section, + trim_type='trimsec') - def test_image_trim_slit(self): - # # define - # slit_low_limit = 5 - # slit_high_limit = 95 - # - # slit_trim = '[1:100,{:d}:{:d}]'.format(slit_low_limit + 10 + 1, - # slit_high_limit - 10) + self.assertEqual(self.fake_image.data.shape, (100, 50)) + self.assertEqual(self.fake_image.header['GSP_TRIM'], trim_section) + + def test_image_trim_invalid_type(self): + self.assertEqual(self.fake_image.data.shape, (100, 100)) + trim_section = '[1:50,:]' self.fake_image = image_trim(ccd=self.fake_image, - trim_section=self.reference_slit_trim, - trim_type='slit') - self.assertIsInstance(self.fake_image, CCDData) - reference_size = (self.slit_high_limit - 10) - \ - (self.slit_low_limit + 10) - self.assertEqual(self.fake_image.data.shape, (reference_size, 100)) + trim_section=trim_section, + trim_type='invalid_type') + self.assertEqual(self.fake_image.data.shape, (100, 50)) + self.assertEqual(self.fake_image.header['GSP_TRIM'], trim_section) - self.assertEqual(self.fake_image.header['GSP_SLIT'], - self.reference_slit_trim) + def test_image_trim_trim_section_none(self): + self.assertEqual(self.fake_image.data.shape, (100, 100)) + self.fake_image = image_trim(ccd=self.fake_image, + trim_section=None, + trim_type='trimsec') + self.assertEqual(self.fake_image.data.shape, (100, 100)) + def test_save_extracted_target_zero(self): + self.fake_image.header.set('GSP_FNAM', value=self.file_name) + same_fake_image = save_extracted(ccd=self.fake_image, + destination=self.current_directory, + prefix='e', + target_number=0) + self.assertEqual(same_fake_image, self.fake_image) + self.assertTrue(os.path.isfile('e' + self.file_name)) -class RaDecConversion(TestCase): + def test_save_extracted_target_non_zero(self): + self.fake_image.header.set('GSP_FNAM', value=self.file_name) + same_fake_image = save_extracted(ccd=self.fake_image, + destination=self.current_directory, + prefix='e', + target_number=self.target_non_zero) + self.assertEqual(same_fake_image, self.fake_image) + self.assertTrue(os.path.isfile('e' + re.sub('.fits', + '_target_{:d}.fits'.format( + self.target_non_zero), + self.file_name))) - def setUp(self): - self.ra = '19:09:55.026' - self.dec = '-68:18:01.901' - self.reference_ra = 287.479275 - self.reference_dec = -68.3005281 + def test_save_extracted_target_zero_comp(self): + self.fake_image.header.set('GSP_FNAM', value=self.file_name) + self.fake_image.header.set('OBSTYPE', value='COMP') + self.fake_image.header.set('GSP_EXTR', value='100.00:101.00') + same_fake_image = save_extracted(ccd=self.fake_image, + destination=self.current_directory, + prefix='e', + target_number=0) - def test_ra_dec_to_deg_negative_dec(self): - radeg, decdeg = ra_dec_to_deg(right_ascension=self.ra, - declination=self.dec) - self.assertAlmostEqual(radeg, self.reference_ra) - self.assertAlmostEqual(decdeg, self.reference_dec) + self.assertEqual(same_fake_image, self.fake_image) + self.assertTrue(os.path.isfile(self.fake_image.header['GSP_FNAM'])) - def test_ra_dec_to_deg_positive_dec(self): - self.dec = '68:18:01.901' - radeg, decdeg = ra_dec_to_deg(right_ascension=self.ra, - declination=self.dec) - self.assertAlmostEqual(radeg, self.reference_ra) - self.assertAlmostEqual(decdeg, -1 * self.reference_dec) + def tearDown(self): + files_to_remove = [self.full_path, self.fake_image.header['GSP_FNAM']] + + for _file in files_to_remove: + if os.path.isfile(_file): + os.unlink(_file) + if 'testdir' in self.full_path: + os.rmdir(os.path.dirname(self.full_path)) -class ReferenceDataTest(TestCase): + +class FixKeywordsTest(TestCase): def setUp(self): - self.rd = ReferenceData( - reference_dir=os.path.join(os.getcwd(), - 'goodman_pipeline/data/ref_comp')) - self.ccd = CCDData(data=np.ones((800, 2000)), + self.ccd = CCDData(data=np.ones((100, 100)), meta=fits.Header(), unit='adu') - self.ccd.header.set('GRATING', value='400_SYZY') - self.ccd.header.set('GRT_TARG', value=7.5) - self.ccd.header.set('CAM_TARG', value=16.1) + self.full_path = os.path.join(os.getcwd(), 'sample_file.fits') + self.ccd.write(self.full_path) - self.columns = ['object', - 'grating', - 'grt_targ', - 'cam_targ', - 'lamp_hga', - 'lamp_ne', - 'lamp_ar', - 'lamp_fe', - 'lamp_cu',] + def tearDown(self): + if os.path.isfile(self.full_path): + os.unlink(self.full_path) - self.data_exist = [ - ['HgArNe', - '400_SYZY', - 7.5, - 16.1, - 'TRUE', - 'TRUE', - 'FALSE', - 'FALSE', - 'FALSE'], - ['HgAr', - '400_SYZY', - 7.5, - 16.1, - 'TRUE', - 'FALSE', - 'FALSE', - 'FALSE', - 'FALSE']] + def test_fix_keywords(self): + # not really testing anything here + fix_keywords(path=os.getcwd(), pattern="*.fits") - self.data_does_not_exist = [ - ['HgArNe', - 'SYZY_800', - 7.5, - 16.1, - 'TRUE', - 'TRUE', - 'FALSE', - 'FALSE', - 'FALSE'], - ['HgAr', - 'SYZY_800', - 7.5, - 16.1, - 'TRUE', - 'FALSE', - 'FALSE', - 'FALSE', - 'FALSE']] - def test_get_reference_lamp_exist_with_lamps_status_key(self): - self.ccd.header.set('LAMP_HGA', value='TRUE') - self.ccd.header.set('LAMP_NE', value='TRUE') - self.ccd.header.set('LAMP_AR', value='FALSE') - self.ccd.header.set('LAMP_FE', value='FALSE') - self.ccd.header.set('LAMP_CU', value='FALSE') - self.ccd.header.set('LAMP_QUA', value='FALSE') - self.ccd.header.set('LAMP_QPE', value=0) - self.ccd.header.set('LAMP_BUL', value='FALSE') - self.ccd.header.set('LAMP_DOM', value='FALSE') - self.ccd.header.set('LAMP_DPE', value=0) +class GenerateDcrFile(TestCase): + def setUp(self): + self.create = GenerateDcrParFile() + self.ccd = CCDData(data=np.ones((100, 100)), + meta=fits.Header(), + unit='adu') + self.ccd.header.set('INSTCONF', value='Red') + self.ccd.header.set('CCDSUM', value='1 1') - self.ccd.header.set('WAVMODE', value='400_M2') + def test_generate_dcr_par_file(self): + serial, parallel = self.ccd.header['CCDSUM'].split() + instrument = self.ccd.header['INSTCONF'] - ref_lamp = self.rd.get_reference_lamp(header=self.ccd.header) + self.assertEqual(serial, '1') + self.assertEqual(instrument, 'Red') + self.assertEqual(self.create._file_name, 'dcr.par') + self.assertIsInstance(self.create._df, pandas.DataFrame) - self.assertIsInstance(ref_lamp, CCDData) - self.assertEqual(ref_lamp.header['LAMP_HGA'], self.ccd.header['LAMP_HGA']) - self.assertEqual(ref_lamp.header['LAMP_NE'], self.ccd.header['LAMP_NE']) - self.assertEqual(ref_lamp.header['WAVMODE'], self.ccd.header['WAVMODE']) + self.assertFalse(os.path.isfile(self.create._file_name)) + self.create() + self.assertTrue(os.path.isfile(self.create._file_name)) - def test_get_reference_lamp_exist_with_object_key(self): - self.ccd.header.set('OBJECT', value='HgArNe') - self.ccd.header.set('WAVMODE', value='400_M2') + self.assertRaises(AssertionError, self.create, 'Green') - ref_lamp = self.rd.get_reference_lamp(header=self.ccd.header) + def tearDown(self): + if os.path.isfile(self.create._file_name): + os.remove(self.create._file_name) - self.assertIsInstance(ref_lamp, CCDData) - self.assertEqual(ref_lamp.header['OBJECT'], self.ccd.header['OBJECT']) - self.assertEqual(ref_lamp.header['WAVMODE'], self.ccd.header['WAVMODE']) - def test_get_reference_lamp_does_not_exist(self): - self.ccd.header.set('OBJECT', value='HgArCu') - self.ccd.header.set('WAVMODE', value='400_M5') +class GetLinesInLampTest(TestCase): - self.assertRaises(NotImplementedError, - self.rd.get_reference_lamp, - self.ccd.header) + def setUp(self): + self.ccd = CCDData(data=np.ones(5000), + meta=fits.Header(), + unit='adu') + self.ccd.header.set('SLIT', value='1.0_LONG_SLIT') + self.ccd.header.set('CCDSUM', value='1 1') + self.ccd.header.set('OBJECT', value='TestSubject') + self.line_centers = np.arange(100, 4900, 200) - def test_lamp_exist(self): - self.ccd.header.set('LAMP_HGA', value='TRUE') - self.ccd.header.set('LAMP_NE', value='TRUE') - self.ccd.header.set('LAMP_AR', value='FALSE') - self.ccd.header.set('LAMP_FE', value='FALSE') - self.ccd.header.set('LAMP_CU', value='FALSE') - self.ccd.header.set('LAMP_QUA', value='FALSE') - self.ccd.header.set('LAMP_QPE', value=0) - self.ccd.header.set('LAMP_BUL', value='FALSE') - self.ccd.header.set('LAMP_DOM', value='FALSE') - self.ccd.header.set('LAMP_DPE', value=0) - self.ccd.header.set('WAVMODE', value='400_M2') - self.assertTrue(self.rd.lamp_exists(header=self.ccd.header)) + self.gaussian = models.Gaussian1D(amplitude=10000, stddev=3) - # HgArNeCu is impossible - self.ccd.header.set('LAMP_CU', value='TRUE') - self.assertFalse(self.rd.lamp_exists(header=self.ccd.header)) + for line_center in self.line_centers: + self.gaussian.mean.value = line_center + self.ccd.data += self.gaussian(range(len(self.ccd.data))) - def test_check_comp_group__lamp_exists(self): + def test_get_lines_in_lamp_wrong_input(self): - comp_group = pandas.DataFrame(self.data_exist, - columns=self.columns) + expected_none = get_lines_in_lamp(ccd=list(range(100))) - new_group = self.rd.check_comp_group(comp_group=comp_group) + self.assertIsNone(expected_none) - self.assertIsInstance(new_group, pandas.DataFrame) - self.assertFalse(comp_group.equals(new_group)) - self.assertEqual(len(new_group), 1) - def test_check_comp_group__lamp_does_not_exist(self): - comp_group = pandas.DataFrame(self.data_does_not_exist, - columns=self.columns) + def test_get_lines_in_lamp_narrow_slit(self): + recovered_lines = get_lines_in_lamp(ccd=self.ccd, plots=False) + np.testing.assert_allclose(self.line_centers, recovered_lines) - new_group = self.rd.check_comp_group(comp_group=comp_group) + def test_get_lines_in_lamp_broad_slit(self): + self.ccd.header.set('SLIT', value='5.0_LONG_SLIT') - self.assertIsInstance(new_group, pandas.DataFrame) - self.assertTrue(comp_group.equals(new_group)) + box_kernel = Box1DKernel(width=5.0 / 0.15) + self.ccd.data = convolve(self.ccd.data, box_kernel) + recovered_lines = get_lines_in_lamp(ccd=self.ccd, plots=False) + np.testing.assert_allclose(self.line_centers, recovered_lines, atol=0.6) -class RecordTraceInformationTest(TestCase): + +class GetOverscanRegionTest(TestCase): def setUp(self): - self.ccd = CCDData(data=np.ones((800, 2000)), + self.ccd = CCDData(data=np.ones((100, 100)), meta=fits.Header(), unit='adu') + self.ccd.header.set('CCDSUM', value='1 1') + self.ccd.header.set('TRIMSEC', value='[10:10,10:10]') - self.all_keywords = ['GSP_TMOD', - 'GSP_TORD', - 'GSP_TC00', - 'GSP_TC01', - 'GSP_TC02', - 'GSP_TERR'] + self.full_path = os.path.join(os.getcwd(), 'testfile.fits') - self.trace_info = collections.OrderedDict() + self.ccd.write(self.full_path) - self.trace_info['GSP_TMOD'] = ['Polinomial1D', - 'Model name used to fit trace'] + def tearDown(self): + if os.path.isfile(self.full_path): + os.unlink(self.full_path) - self.trace_info['GSP_TORD'] = [2, 'Degree of the model used to fit ' - 'target trace'] + def test_get_overscan_region_spectroscopy_blue(self): + self.ccd.header.set('INSTCONF', 'Blue') + self.ccd.write(self.full_path, overwrite=True) - self.trace_info['GSP_TC00'] = [500, 'Parameter c0'] - self.trace_info['GSP_TC01'] = [1, 'Parameter c1'] - self.trace_info['GSP_TC02'] = [2, 'Parameter c2'] - self.trace_info['GSP_TERR'] = [0.5, 'RMS error of target trace'] + expected_overscan = '[1:16,1:100]' - def test_record_trace_information(self): - ccd = record_trace_information(ccd=self.ccd, trace_info=self.trace_info) - new_keys = [key for key in ccd.header.keys()] + overscan_region = get_overscan_region(sample_image=self.full_path, + technique='Spectroscopy') - self.assertTrue(all([key in new_keys for key in self.all_keywords])) - self.assertEqual(ccd.header['GSP_TMOD'], 'Polinomial1D') - self.assertEqual(ccd.header['GSP_TORD'], 2) + self.assertEqual(expected_overscan, overscan_region) + def test_get_overscan_region_spectroscopy_red(self): + self.ccd.header.set('INSTCONF', 'Red') + self.ccd.write(self.full_path, overwrite=True) -class SearchCompGroupTest(TestCase): + expected_overscan = '[6:49,1:100]' - def setUp(self): - columns = ['object', - 'grating', - 'cam_targ', - 'grt_targ', - 'filter', - 'filter2', - 'lamp_hga', - 'lamp_ne', - 'lamp_ar', - 'lamp_fe', - 'lamp_cu'] + overscan_region = get_overscan_region(sample_image=self.full_path, + technique='Spectroscopy') - self.object_group = pandas.DataFrame( - data=[['NGC2070', - 'SYZY_400', - 16.1, - 7.5, - '', - 'GG455', - 'TRUE', - 'FALSE', - 'FALSE', - 'FALSE', - 'FALSE' - ]], - columns=columns) - self.object_group_no_match = pandas.DataFrame( - data=[['NGC2070', - 'SYZY_600', - 16.1, - 7.5, - '', - 'GG455', - 'TRUE', - 'FALSE', - 'FALSE', - 'FALSE', - 'FALSE']], - columns=columns) - self.comp_groups = [ - pandas.DataFrame( - data=[['HgArNe', - 'SYZY_400', - 16.1, - 7.5, - '', - 'GG455', - 'TRUE', - 'FALSE', - 'FALSE', - 'FALSE', - 'FALSE']], columns=columns), - pandas.DataFrame( - data=[['CuArNe', - 'SYZY_400', - 11.6, - 5.8, - '', - 'GG455', - 'TRUE', - 'FALSE', - 'FALSE', - 'FALSE', - 'FALSE']], columns=columns)] + self.assertEqual(expected_overscan, overscan_region) - self.reference_data = ReferenceData( - reference_dir=os.path.join(os.getcwd(), - 'goodman_pipeline/data/ref_comp')) + def test_get_overscan_region_imaging(self): + overscan_region = get_overscan_region(sample_image=self.full_path, + technique='Imaging') - def test_search_comp_group(self): - result = search_comp_group( - object_group=self.object_group, - comp_groups=self.comp_groups, - reference_data=self.reference_data) - self.assertIsInstance(result, pandas.DataFrame) - self.assertFalse(result.empty) + self.assertIsNone(overscan_region) - def test_search_comp_group_no_match(self): - with self.assertRaises(NoMatchFound): - search_comp_group( - object_group=self.object_group_no_match, - comp_groups=self.comp_groups, - reference_data=self.reference_data) + def test_get_overscan_region_other_technique(self): + overscan_region = get_overscan_region(sample_image=self.full_path, + technique='AnyOther') + + self.assertIsNone(overscan_region) -class SpectroscopicModeTest(TestCase): +class GetSpectralCharacteristicsTest(TestCase): def setUp(self): - self.sm = SpectroscopicMode() - self.ccd = CCDData(data=np.ones((800, 2000)), + self.ccd = CCDData(data=np.random.random_sample(200), meta=fits.Header(), unit='adu') - self.ccd.header.set('GRATING', value='SYZY_400') - self.ccd.header.set('CAM_TARG', value='16.1') - self.ccd.header.set('GRT_TARG', value='7.5') - self.ccd.header.set('FILTER2', value='GG455') - - def test__call__(self): - self.assertRaises(SyntaxError, self.sm) - - mode_m2_header = self.sm(header=self.ccd.header) - - self.assertEqual(mode_m2_header, 'm2') - - mode_m2_keywords = self.sm(grating=self.ccd.header['GRATING'], - camera_targ=self.ccd.header['CAM_TARG'], - grating_targ=self.ccd.header['GRT_TARG'], - blocking_filter=self.ccd.header['FILTER2']) - - self.assertEqual(mode_m2_keywords, 'm2') + self.pixel_size = 15 * u.micrometer + self.goodman_focal_length = 377.3 * u.mm - def test_get_mode(self): - mode_m2 = self.sm.get_mode(grating='400', - camera_targ='16.1', - grating_targ='7.5', - blocking_filter='GG455') - self.assertEqual(mode_m2, 'm2') + def test__get_spectral_characteristics(self): + self.ccd.header.set('SLIT', '1.0_LONG_SLIT') + self.ccd.header.set('GRATING', 'SYZY_400') + self.ccd.header.set('GRT_ANG', 7.5) + self.ccd.header.set('CAM_ANG', 16.1) + self.ccd.header.set('CCDSUM', '1 1') - mode_custom_400 = self.sm.get_mode(grating='400', - camera_targ='16.1', - grating_targ='6.6', - blocking_filter='GG455') + spec_charact = get_spectral_characteristics( + ccd=self.ccd, + pixel_size=self.pixel_size, + instrument_focal_length=self.goodman_focal_length) - self.assertEqual(mode_custom_400, 'Custom_7000nm') + self.assertIsInstance(spec_charact, dict) + self.assertEqual(len(spec_charact), 7) - mode_custom_2100 = self.sm.get_mode(grating='2100', - camera_targ='16.1', - grating_targ='7.5', - blocking_filter='GG455') - self.assertEqual(mode_custom_2100, 'Custom_1334nm') - def test_get_cam_grt_targ_angle(self): +class InterpolationTest(TestCase): - cam_targ, grt_targ = self.sm.get_cam_grt_targ_angle(1800, 'm10') - self.assertIsNone(cam_targ) - self.assertIsNone(grt_targ) + def test_interpolate(self): + initial_array = np.sin(np.arange(0, 3 * np.pi)) + initial_length = len(initial_array) - cam_targ, grt_targ = self.sm.get_cam_grt_targ_angle(930, 'm5') - self.assertEqual(cam_targ, '39.4') - self.assertEqual(grt_targ, '19.7') + new_x_axis, new_array = interpolate(spectrum=initial_array, + interpolation_size=100) - cam_targ, grt_targ = self.sm.get_cam_grt_targ_angle(930, 'm7') - self.assertIsNone(cam_targ) - self.assertIsNone(grt_targ) + self.assertEqual(len(new_x_axis), len(new_array)) + self.assertEqual(len(new_array), initial_length * 100) -class TargetsTest(TestCase): +class IsFileSaturatedTest(TestCase): def setUp(self): - self.ccd = CCDData(data=np.ones((800, 2000)), + self.ccd = CCDData(data=np.ones((100, 100)), meta=fits.Header(), unit='adu') + self.ccd.header.set('INSTCONF', value='Red') + self.ccd.header.set('GAIN', value=1.48) + self.ccd.header.set('RDNOISE', value=3.89) - self.ccd.header.set('GSP_FNAM', - value='fake-name.fits', - comment='Fake file name') + self.half_full_well = 69257 - self.profile_1 = models.Gaussian1D(amplitude=200, - mean=100, - stddev=10).rename('Profile_1') - self.profile_2 = models.Gaussian1D(amplitude=200, - mean=600, - stddev=10).rename('Profile_2') + def test_file_is_saturated(self): + self.ccd.data[:10, :10] = self.half_full_well + 1 + self.assertTrue(is_file_saturated(ccd=self.ccd, threshold=1)) - self.profile_3 = models.Moffat1D(amplitude=200, - x_0=600, - gamma=3).rename('Profile_3') + def test_file_is_not_saturated(self): + self.ccd.data[:10, :10] = self.half_full_well + 1 + self.ccd.data[0, 0] = 1 + self.assertFalse(is_file_saturated(ccd=self.ccd, threshold=1)) - profile_sum = self.profile_1 + self.profile_2 - self.ccd2 = self.ccd.copy() - self.no_targets_ccd = self.ccd.copy() - for i in range(self.ccd.data.shape[1]): - self.ccd.data[:, i] *= profile_sum(range(self.ccd.data.shape[0])) - self.ccd2.data[:, i] *= self.profile_3( - range(self.ccd2.data.shape[0])) - # this add noise to test the removal of masked values - # self.ccd.data[ - # random.randrange(self.ccd.data.shape[0]), - # random.randrange(self.ccd.data.shape[1])] *= 300 - # self.ccd2.data[ - # random.randrange(self.ccd2.data.shape[0]), - # random.randrange(self.ccd2.data.shape[1])] *= 300 +class LinearizeSpectrumTest(TestCase): + def setUp(self): + feature = models.Gaussian1D(amplitude=500, mean=3000, stddev=5) + self.data = np.ones(5000) + feature(range(5000)) + self.solution_model = models.Polynomial1D(degree=3) + self.solution_model.c0.value = 3500 + self.solution_model.c1.value = 1 + self.solution_model.c2.value = 1e-7 + + self.non_linear_x_axis = self.solution_model(range(len(self.data))) + + self.feature_center = self.non_linear_x_axis[3000] + + def test_linearize_spectrum_nans_in_data(self): + self.data[0:10] = np.nan + self.assertRaises(SystemExit, + linearize_spectrum, + self.data, + self.solution_model) + + def test_linearize_spectrum_wrong_input(self): + linear_data = linearize_spectrum(data=self.data, + wavelength_solution=None) + self.assertIsNone(linear_data) + + def test_linearize_spectrum(self): + linear_x_axis, linear_data = linearize_spectrum(data=self.data, + wavelength_solution=self.solution_model) + + new_gaussian = models.Gaussian1D(amplitude=100, + mean=self.feature_center, + stddev=5) + fitter = fitting.LevMarLSQFitter() - def tearDown(self): - del self.ccd - del self.profile_1 - del self.profile_2 - del self.profile_3 + fitted_linear = fitter(new_gaussian, linear_x_axis, linear_data) - def test_identify_targets_moffat(self): - self.ccd.header.set('OBSTYPE', - value='OBJECT', - comment='Fake values') - self.ccd.header.set('SLIT', - value='1.03" long slit', - comment='Fake slit') - self.ccd.header.set('CCDSUM', - value='1 1', - comment='Fake values') - targets = identify_targets(ccd=self.ccd, - fit_model='moffat', - background_threshold=3, - nfind=2, - plots=False) - self.assertEqual(len(targets), 2) - for target in targets: - self.assertIsInstance(target, Model) + np.testing.assert_array_almost_equal(self.feature_center, + fitted_linear.mean.value, + decimal=2) + + +class MasterFlatTest(TestCase): + + def setUp(self): + # create a master flat + self.master_flat = CCDData(data=np.ones((100, 100)), + meta=fits.Header(), + unit='adu') + self.master_flat.header.set('GRATING', value='RALC_1200-BLUE') + self.master_flat.header.set('SLIT', value='0.84" long slit') + self.master_flat.header.set('FILTER2', value='') + self.master_flat.header.set('WAVMODE', value='1200 m2') + self.master_flat_name = 'master_flat_1200m2.fits' + # expected master flat to be retrieved by get_best_flat + self.reference_flat_name = 'master_flat_1200m2_0.84_dome.fits' + # location of sample flats + self.flat_path = 'goodman_pipeline/data/test_data/master_flat' + slit = re.sub('[A-Za-z" ]', + '', + self.master_flat.header['SLIT']) + self.flat_name_base = re.sub('.fits', + '_' + slit + '*.fits', + self.master_flat_name) + + # save a master flat with some random structure. + + self.master_flat_name_norm = 'flat_to_normalize.fits' + # add a bias level + self.master_flat.data += 300. + # add noise + self.master_flat.data += np.random.random_sample( + self.master_flat.data.shape) + + self.master_flat.write(os.path.join(self.flat_path, + self.master_flat_name_norm), + overwrite=False) + + def tearDown(self): + full_path = os.path.join(self.flat_path, + self.master_flat_name_norm) + + self.assertTrue(os.path.isfile(full_path)) + if os.path.isfile(full_path): + os.unlink(full_path) + self.assertFalse(os.path.isfile(full_path)) + + # remove normalized flat + norm_flat = re.sub('flat_to_', 'norm_flat_to_', full_path) + if os.path.isfile(norm_flat): + os.unlink(norm_flat) + self.assertFalse(os.path.isfile(norm_flat)) + + def test_get_best_flat(self): + # print(self.flat_name_base) + + master_flat, master_flat_name = get_best_flat( + flat_name=self.flat_name_base, + path=self.flat_path) + self.assertIsInstance(master_flat, CCDData) + self.assertEqual(os.path.basename(master_flat_name), + self.reference_flat_name) + + def test_get_best_flat_fail(self): + # Introduce an error that will never produce a result. + wrong_flat_name = re.sub('1200m2', '1300m2', self.flat_name_base) + master_flat, master_flat_name = get_best_flat( + flat_name=wrong_flat_name, + path=self.flat_path) + self.assertIsNone(master_flat) + self.assertIsNone(master_flat_name) + + def test_normalize_master_flat(self): + methods = ['mean', 'simple', 'full'] + for method in methods: + self.assertNotAlmostEqual(self.master_flat.data.mean(), 1.) + normalized_flat, normalized_flat_name = normalize_master_flat( + master=self.master_flat, + name=os.path.join(self.flat_path, + self.master_flat_name_norm), + method=method) + + self.assertAlmostEqual(normalized_flat.data.mean(), 1., + delta=0.001) + self.assertEqual(normalized_flat.header['GSP_NORM'], method) + self.assertIn('norm_', normalized_flat_name) + + +class NameMasterFlatsTest(TestCase): + + def setUp(self): + self.reduced_data = os.getcwd() + + #reference + date = '2019-08-28' + self.twilight_start_evening = '2019-08-27T23:45:00.022' + self.twilight_end_morning = '2019-08-28T09:43:20.023' + self.sun_set_time = '2019-08-27T22:21:00.437' + self.sun_rise_time = '2019-08-28T11:07:11.851' + + self.header = fits.Header() + self.header.set('GRATING', value='400_SYZY') + self.header.set('GRT_TARG', value=7.5) + self.header.set('CAM_TARG', value=16.1) + self.header.set('FILTER', value='') + self.header.set('FILTER2', value='') + self.header.set('SLIT', value='1.0_LONG_SLIT') + + def test_name_master_flats_spectroscopy(self): + + expected_name = 'master_flat_TestSubject_400m2_GG455_1.0_dome.fits' + + self.header.set('FILTER2', value='GG455') + self.header.set('DATE-OBS', value='2019-08-27T20:21:00.437') + + flat_name = name_master_flats( + header=self.header, + technique='Spectroscopy', + reduced_data=self.reduced_data, + sun_set=self.sun_set_time, + sun_rise=self.sun_rise_time, + evening_twilight=self.twilight_start_evening, + morning_twilight=self.twilight_end_morning, + target_name='TestSubject', + get=False) + self.assertEqual(expected_name, os.path.basename(flat_name)) + + def test_name_master_flats_spectroscopy_no_grating_no_filter2(self): + + expected_name = 'master_flat_TestSubject_no_grating_1.0_sky.fits' + + self.header.set('GRATING', value='') + self.header.set('DATE-OBS', value='2019-08-27T23:40:00.022') + + flat_name = name_master_flats( + header=self.header, + technique='Spectroscopy', + reduced_data=self.reduced_data, + sun_set=self.sun_set_time, + sun_rise=self.sun_rise_time, + evening_twilight=self.twilight_start_evening, + morning_twilight=self.twilight_end_morning, + target_name='TestSubject', + get=False) + + self.assertEqual(expected_name, os.path.basename(flat_name)) + + def test_name_master_flats_imaging_no_filter(self): + expected_name = 'master_flat_NO_FILTER_night.fits' + self.header.set('DATE-OBS', value='2019-08-27T23:50:00.022') + + flat_name = name_master_flats( + header=self.header, + technique='Imaging', + reduced_data=self.reduced_data, + sun_set=self.sun_set_time, + sun_rise=self.sun_rise_time, + evening_twilight=self.twilight_start_evening, + morning_twilight=self.twilight_end_morning, + target_name='TestSubject', + get=False) + self.assertEqual(expected_name, os.path.basename(flat_name)) + + def test_name_master_flats_imaging(self): + expected_name = 'master_flat_u_BESSEL_night.fits' + self.header.set('DATE-OBS', value='2019-08-27T23:50:00.022') + self.header.set('FILTER', value='u-BESSEL') + + flat_name = name_master_flats( + header=self.header, + technique='Imaging', + reduced_data=self.reduced_data, + sun_set=self.sun_set_time, + sun_rise=self.sun_rise_time, + evening_twilight=self.twilight_start_evening, + morning_twilight=self.twilight_end_morning, + target_name='TestSubject', + get=False) + self.assertEqual(expected_name, os.path.basename(flat_name)) + + def test_name_master_flats_get_true(self): + expected_name = 'master_flat_TestSubject_400m2_GG455_1.0*.fits' + + self.header.set('FILTER2', value='GG455') + self.header.set('DATE-OBS', value='2019-08-27T20:21:00.437') + + flat_name = name_master_flats( + header=self.header, + technique='Spectroscopy', + reduced_data=self.reduced_data, + sun_set=self.sun_set_time, + sun_rise=self.sun_rise_time, + evening_twilight=self.twilight_start_evening, + morning_twilight=self.twilight_end_morning, + target_name='TestSubject', + get=True) + self.assertEqual(expected_name, os.path.basename(flat_name)) + + +class NightDataContainerTests(TestCase): + + def setUp(self): + self.container = NightDataContainer(path=os.getcwd(), + instrument='Red', + technique='Spectroscopy') + + columns = ['file', 'obstype'] + sample_data_1 = [['file1.fits', 'OBJECT']] + sample_data_2 = [['file1.fits', 'OBJECT'], + ['file2.fits', 'OBJECT']] + + self.sample_df_1 = pandas.DataFrame(sample_data_1, columns=columns) + self.sample_df_2 = pandas.DataFrame(sample_data_2, columns=columns) + + def test___repr___method_empty(self): + result = self.container.__repr__() + self.assertEqual(result, 'Empty Data Container') + + def test___repr___method_not_empty(self): + self.container.is_empty = False + self.container.gain = 1 + self.container.rdnoise = 1 + self.container.roi = 'roi' + result = self.container.__repr__() + + self.assertIn('Full Path: {:s}'.format(os.getcwd()), result) + self.assertIn('Instrument: Red', result) + self.assertIn('Technique: Spectroscopy', result) + self.assertIn('Is Empty: False', result) + + _expected_content = ['Data Grouping Information', + 'BIAS Group:', + 'Group is Empty', + 'Day FLATs Group:', + 'Dome FLATs Group:', + 'Sky FLATs Group:', + 'COMP Group:', + 'OBJECT Group', + 'OBJECT + COMP Group:'] + + for _line in _expected_content: + self.assertIn(_line, result) + + def test___repr___method_imaging_not_empty(self): + self.container.technique = 'Imaging' + self.container.add_bias(bias_group=self.sample_df_2) + self.container.add_day_flats(day_flats=self.sample_df_1) + self.container.add_data_group(data_group=self.sample_df_2) + + # + self.container.dome_flats = [self.sample_df_1] + self.container.sky_flats = [self.sample_df_2] + + self.container.gain = 1 + self.container.rdnoise = 1 + self.container.roi = 'roi' + result = self.container.__repr__() + + self.assertNotIn('Group is Empty', result) + + + @skip + def test__get_group_repr(self): + pass + + def test_add_bias_imaging_insufficient_bias(self): + self.container.technique = 'Imaging' + self.container.add_bias(bias_group=self.sample_df_1) + self.assertTrue(self.container.bias is None) + self.assertTrue(self.container.is_empty) + + def test_add_bias_spectroscopy_insufficient_bias(self): + self.container.add_bias(bias_group=self.sample_df_1) + self.assertTrue(self.container.bias is None) + self.assertTrue(self.container.is_empty) + + def test_add_bias(self): + self.container.add_bias(bias_group=self.sample_df_2) + self.container.add_bias(bias_group=self.sample_df_2) + self.assertFalse(self.container.bias is None) + self.assertFalse(self.container.is_empty) + + def test_add_day_flats(self): + self.container.add_day_flats(day_flats=self.sample_df_1) + self.assertIsInstance(self.container.day_flats[0], pandas.DataFrame) + self.container.add_day_flats(day_flats=self.sample_df_2) + self.assertFalse(self.container.day_flats is None) + self.assertFalse(self.container.is_empty) + + def test_add_data_group(self): + self.container.add_data_group(data_group=self.sample_df_1) + self.assertIsInstance(self.container.data_groups[0], pandas.DataFrame) + self.container.add_data_group(data_group=self.sample_df_2) + self.assertFalse(self.container.data_groups is None) + self.assertFalse(self.container.is_empty) + + def test_add_comp_group(self): + self.container.add_comp_group(comp_group=self.sample_df_1) + self.assertIsInstance(self.container.comp_groups[0], pandas.DataFrame) + self.container.add_comp_group(comp_group=self.sample_df_2) + self.assertFalse(self.container.comp_groups is None) + self.assertFalse(self.container.is_empty) + + def test_add_object_group(self): + self.container.add_object_group(object_group=self.sample_df_1) + self.assertIsInstance(self.container.object_groups[0], pandas.DataFrame) + self.container.add_object_group(object_group=self.sample_df_2) + self.assertFalse(self.container.object_groups is None) + self.assertFalse(self.container.is_empty) + + def test_add_spec_group(self): + self.container.add_spec_group(spec_group=self.sample_df_1) + self.assertIsInstance(self.container.spec_groups[0], pandas.DataFrame) + self.container.add_spec_group(spec_group=self.sample_df_2) + self.assertFalse(self.container.spec_groups is None) + self.assertFalse(self.container.is_empty) + + def test_set_sun_times(self): + _sun_set = '2019-01-01T18:00:00' + _sun_rise = '2019-01-01T06:00:00' + self.container.set_sun_times(sun_set=_sun_set, sun_rise=_sun_rise) + + self.assertEqual(self.container.sun_set_time, _sun_set) + self.assertEqual(self.container.sun_rise_time, _sun_rise) + + def test_set_twilight_times(self): + _evening = '2019-01-01T18:00:00' + _morning = '2019-01-01T06:00:00' + + self.container.set_twilight_times(evening=_evening, morning=_morning) + + self.assertEqual(self.container.evening_twilight, _evening) + self.assertEqual(self.container.morning_twilight, _morning) + + def test_set_readout(self): + _gain = 1.48 + _rdnoise = 3.89 + _roi = 'Spectroscopic 2x2' + + self.container.set_readout(gain=_gain, rdnoise=_rdnoise, roi=_roi) + + self.assertEqual(self.container.gain, _gain) + self.assertEqual(self.container.rdnoise, _rdnoise) + self.assertEqual(self.container.roi, _roi) + + +class RaDecConversion(TestCase): + + def setUp(self): + self.ra = '19:09:55.026' + self.dec = '-68:18:01.901' + self.reference_ra = 287.479275 + self.reference_dec = -68.3005281 + + def test_ra_dec_to_deg_negative_dec(self): + radeg, decdeg = ra_dec_to_deg(right_ascension=self.ra, + declination=self.dec) + self.assertAlmostEqual(radeg, self.reference_ra) + self.assertAlmostEqual(decdeg, self.reference_dec) + + def test_ra_dec_to_deg_positive_dec(self): + self.dec = '68:18:01.901' + radeg, decdeg = ra_dec_to_deg(right_ascension=self.ra, + declination=self.dec) + self.assertAlmostEqual(radeg, self.reference_ra) + self.assertAlmostEqual(decdeg, -1 * self.reference_dec) + + +class RecordTraceInformationTest(TestCase): + + def setUp(self): + self.ccd = CCDData(data=np.ones((800, 2000)), + meta=fits.Header(), + unit='adu') + + self.all_keywords = ['GSP_TMOD', + 'GSP_TORD', + 'GSP_TC00', + 'GSP_TC01', + 'GSP_TC02', + 'GSP_TERR'] + + self.trace_info = collections.OrderedDict() + + self.trace_info['GSP_TMOD'] = ['Polinomial1D', + 'Model name used to fit trace'] + + self.trace_info['GSP_TORD'] = [2, 'Degree of the model used to fit ' + 'target trace'] + + self.trace_info['GSP_TC00'] = [500, 'Parameter c0'] + self.trace_info['GSP_TC01'] = [1, 'Parameter c1'] + self.trace_info['GSP_TC02'] = [2, 'Parameter c2'] + self.trace_info['GSP_TERR'] = [0.5, 'RMS error of target trace'] + + def test_record_trace_information(self): + ccd = record_trace_information(ccd=self.ccd, trace_info=self.trace_info) + new_keys = [key for key in ccd.header.keys()] + + self.assertTrue(all([key in new_keys for key in self.all_keywords])) + self.assertEqual(ccd.header['GSP_TMOD'], 'Polinomial1D') + self.assertEqual(ccd.header['GSP_TORD'], 2) + + +class ReferenceDataTest(TestCase): + + def setUp(self): + self.rd = ReferenceData( + reference_dir=os.path.join(os.getcwd(), + 'goodman_pipeline/data/ref_comp')) + self.ccd = CCDData(data=np.ones((800, 2000)), + meta=fits.Header(), + unit='adu') + self.ccd.header.set('GRATING', value='400_SYZY') + self.ccd.header.set('GRT_TARG', value=7.5) + self.ccd.header.set('CAM_TARG', value=16.1) + + self.columns = ['object', + 'grating', + 'grt_targ', + 'cam_targ', + 'lamp_hga', + 'lamp_ne', + 'lamp_ar', + 'lamp_fe', + 'lamp_cu',] + + self.data_exist = [ + ['HgArNe', + '400_SYZY', + 7.5, + 16.1, + 'TRUE', + 'TRUE', + 'FALSE', + 'FALSE', + 'FALSE'], + ['HgAr', + '400_SYZY', + 7.5, + 16.1, + 'TRUE', + 'FALSE', + 'FALSE', + 'FALSE', + 'FALSE']] - def test_identify_targets_gaussian(self): - self.ccd.header.set('OBSTYPE', - value='OBJECT', - comment='Fake values') - self.ccd.header.set('SLIT', - value='1.03" long slit', - comment='Fake slit') - self.ccd.header.set('CCDSUM', - value='1 1', - comment='Fake values') - targets = identify_targets(ccd=self.ccd, - fit_model='gaussian', - background_threshold=3, - nfind=2, - plots=False) - self.assertEqual(len(targets), 2) - for target in targets: - self.assertIsInstance(target, Model) + self.data_does_not_exist = [ + ['HgArNe', + 'SYZY_800', + 7.5, + 16.1, + 'TRUE', + 'TRUE', + 'FALSE', + 'FALSE', + 'FALSE'], + ['HgAr', + 'SYZY_800', + 7.5, + 16.1, + 'TRUE', + 'FALSE', + 'FALSE', + 'FALSE', + 'FALSE']] - def test_identify_targets_empty_output(self): - self.no_targets_ccd.header.set('OBSTYPE', - value='OBJECT', - comment='Fake values') - self.no_targets_ccd.header.set('SLIT', - value='1.03" long slit', - comment='Fake slit') - self.no_targets_ccd.header.set('CCDSUM', - value='1 1', - comment='Fake values') - targets = identify_targets(ccd=self.no_targets_ccd, - fit_model='gaussian', - background_threshold=3, - nfind=2, - plots=False) - self.assertEqual(len(targets), 0) + def test_get_reference_lamp_exist_with_lamps_status_key(self): + self.ccd.header.set('LAMP_HGA', value='TRUE') + self.ccd.header.set('LAMP_NE', value='TRUE') + self.ccd.header.set('LAMP_AR', value='FALSE') + self.ccd.header.set('LAMP_FE', value='FALSE') + self.ccd.header.set('LAMP_CU', value='FALSE') + self.ccd.header.set('LAMP_QUA', value='FALSE') + self.ccd.header.set('LAMP_QPE', value=0) + self.ccd.header.set('LAMP_BUL', value='FALSE') + self.ccd.header.set('LAMP_DOM', value='FALSE') + self.ccd.header.set('LAMP_DPE', value=0) - def test_trace_gaussian(self): - trace_model = models.Polynomial1D(degree=2) - fitter = fitting.LevMarLSQFitter() - test_trace, trace_rms = trace(ccd=self.ccd, - model=self.profile_1, - trace_model=trace_model, - model_fitter=fitter, - sampling_step=5) - self.assertEqual(test_trace.c0.value, self.profile_1.mean.value) - self.assertAlmostEqual(test_trace.c1.value, 0.) - self.assertAlmostEqual(test_trace.c2.value, 0.) - def test_trace_moffat(self): - trace_model = models.Polynomial1D(degree=2) - fitter = fitting.LevMarLSQFitter() - test_trace, trace_rms = trace(ccd=self.ccd2, - model=self.profile_3, - trace_model=trace_model, - model_fitter=fitter, - sampling_step=5) - self.assertEqual(test_trace.c0.value, self.profile_3.x_0.value) - self.assertAlmostEqual(test_trace.c1.value, 0.) - self.assertAlmostEqual(test_trace.c2.value, 0.) + self.ccd.header.set('WAVMODE', value='400_M2') - def test_trace_not_implemented(self): - trace_model = models.Polynomial1D(degree=2) - fitter = fitting.LevMarLSQFitter() - self.assertRaises(NotImplementedError, - trace, - self.ccd2, - models.BlackBody1D(), - trace_model, - fitter, - 5) + ref_lamp = self.rd.get_reference_lamp(header=self.ccd.header) - def test_trace_targets(self): - targets = [self.profile_1, self.profile_2] - all_traces = trace_targets(ccd=self.ccd, - target_list=targets, - sampling_step=5, - pol_deg=2, - nfwhm=2, - plots=False) - for new_trace, profile, trace_info in all_traces: - self.assertEqual(new_trace.c0.value, profile.mean.value) - self.assertAlmostEqual(new_trace.c1.value, 0) - self.assertAlmostEqual(new_trace.c2.value, 0) + self.assertIsInstance(ref_lamp, CCDData) + self.assertEqual(ref_lamp.header['LAMP_HGA'], self.ccd.header['LAMP_HGA']) + self.assertEqual(ref_lamp.header['LAMP_NE'], self.ccd.header['LAMP_NE']) + self.assertEqual(ref_lamp.header['WAVMODE'], self.ccd.header['WAVMODE']) + def test_get_reference_lamp_no_match_status_keys(self): + self.ccd.header.set('LAMP_HGA', value='TRUE') + self.ccd.header.set('LAMP_NE', value='TRUE') + self.ccd.header.set('LAMP_AR', value='TRUE') + self.ccd.header.set('LAMP_FE', value='TRUE') + self.ccd.header.set('LAMP_CU', value='TRUE') + self.ccd.header.set('LAMP_QUA', value='TRUE') + self.ccd.header.set('LAMP_QPE', value=0) + self.ccd.header.set('LAMP_BUL', value='FALSE') + self.ccd.header.set('LAMP_DOM', value='FALSE') + self.ccd.header.set('LAMP_DPE', value=0) -class FitsFileIOAndOps(TestCase): + self.ccd.header.set('WAVMODE', value='400_M2') - def setUp(self): - self.fake_image = CCDData(data=np.ones((100, 100)), - meta=fits.Header(), - unit='adu') + self.assertRaises(NotImplementedError, + self.rd.get_reference_lamp, + self.ccd.header) - self.file_name = 'sample_file.fits' - self.target_non_zero = 4 - self.current_directory = os.getcwd() - self.full_path = os.path.join(self.current_directory, self.file_name) - self.parent_file = 'parent_file.fits' + def test_get_reference_lamp_exist_with_object_key(self): + self.ccd.header.set('OBJECT', value='HgArNe') + self.ccd.header.set('WAVMODE', value='400_M2') - self.fake_image.header.set('CCDSUM', - value='1 1', - comment='Fake values') + self.ccd.header.set('GSP_P', value='1') + self.ccd.header.set('GSP_P', value='2') + self.ccd.header.set('GSP_P', value='3') + self.ccd.header.set('GSP_P', value='4') - self.fake_image.header.set('OBSTYPE', - value='OBJECT', - comment='Fake values') + self.ccd.header.set('GSP_A', value='0') + self.ccd.header.set('GSP_A', value='100') + self.ccd.header.set('GSP_A', value='0') + self.ccd.header.set('GSP_A', value='200') - self.fake_image.header.set('GSP_FNAM', - value=self.file_name, - comment='Fake values') + ref_lamp = self.rd.get_reference_lamp(header=self.ccd.header) - self.fake_image.header.set('GSP_PNAM', - value=self.parent_file, - comment='Fake values') + self.assertIsInstance(ref_lamp, CCDData) + self.assertEqual(ref_lamp.header['OBJECT'], self.ccd.header['OBJECT']) + self.assertEqual(ref_lamp.header['WAVMODE'], self.ccd.header['WAVMODE']) - self.fake_image.write(self.full_path, overwrite=False) + def test_get_reference_lamp_does_not_exist(self): + self.ccd.header.set('OBJECT', value='HgArCu') + self.ccd.header.set('WAVMODE', value='400_M5') - def test_write_fits(self): - self.assertTrue(os.path.isfile(self.full_path)) - os.remove(self.full_path) - write_fits(ccd=self.fake_image, - full_path=self.full_path, - parent_file=self.parent_file, - overwrite=False) - self.assertTrue(os.path.isfile(self.full_path)) + self.assertRaises(NotImplementedError, + self.rd.get_reference_lamp, + self.ccd.header) - def test_read_fits(self): - self.fake_image.header.remove('GSP_PNAM') - self.fake_image.write(self.full_path, overwrite=True) + def test_lamp_exist(self): + self.ccd.header.set('LAMP_HGA', value='TRUE') + self.ccd.header.set('LAMP_NE', value='TRUE') + self.ccd.header.set('LAMP_AR', value='FALSE') + self.ccd.header.set('LAMP_FE', value='FALSE') + self.ccd.header.set('LAMP_CU', value='FALSE') + self.ccd.header.set('LAMP_QUA', value='FALSE') + self.ccd.header.set('LAMP_QPE', value=0) + self.ccd.header.set('LAMP_BUL', value='FALSE') + self.ccd.header.set('LAMP_DOM', value='FALSE') + self.ccd.header.set('LAMP_DPE', value=0) + self.ccd.header.set('WAVMODE', value='400_M2') + self.assertTrue(self.rd.lamp_exists(header=self.ccd.header)) - self.recovered_fake_image = read_fits(self.full_path) - self.assertIsInstance(self.recovered_fake_image, CCDData) + # HgArNeCu is impossible + self.ccd.header.set('LAMP_CU', value='TRUE') + self.assertFalse(self.rd.lamp_exists(header=self.ccd.header)) - def test_image_overscan(self): - data_value = 100. - overscan_value = 0.1 - # alter overscan region to a lower number - self.fake_image.data *= data_value - self.fake_image.data[:, 0:5] = overscan_value + def test_check_comp_group__lamp_exists(self): - overscan_region = '[1:6,:]' - self.assertEqual(self.fake_image.data[:, 6:99].mean(), data_value) - self.assertEqual(self.fake_image.data[:, 0:5].mean(), overscan_value) - self.fake_image = image_overscan(ccd=self.fake_image, - overscan_region=overscan_region) + comp_group = pandas.DataFrame(self.data_exist, + columns=self.columns) - self.assertEqual(self.fake_image.data[:, 6:99].mean(), - data_value - overscan_value) - self.assertEqual(self.fake_image.header['GSP_OVER'], overscan_region) + new_group = self.rd.check_comp_group(comp_group=comp_group) - def test_image_overscan_none(self): - new_fake_image = image_overscan(ccd=self.fake_image, - overscan_region=None) - self.assertEqual(new_fake_image, self.fake_image) + self.assertIsInstance(new_group, pandas.DataFrame) + self.assertFalse(comp_group.equals(new_group)) + self.assertEqual(len(new_group), 1) - def test_image_trim(self): - self.assertEqual(self.fake_image.data.shape, (100, 100)) - trim_section = '[1:50,:]' - self.fake_image = image_trim(ccd=self.fake_image, - trim_section=trim_section, - trim_type='trimsec') + def test_check_comp_group__lamp_does_not_exist(self): + comp_group = pandas.DataFrame(self.data_does_not_exist, + columns=self.columns) - self.assertEqual(self.fake_image.data.shape, (100, 50)) - self.assertEqual(self.fake_image.header['GSP_TRIM'], trim_section) + new_group = self.rd.check_comp_group(comp_group=comp_group) - def test_image_trim_invalid_type(self): - self.assertEqual(self.fake_image.data.shape, (100, 100)) - trim_section = '[1:50,:]' - self.fake_image = image_trim(ccd=self.fake_image, - trim_section=trim_section, - trim_type='invalid_type') - self.assertEqual(self.fake_image.data.shape, (100, 50)) - self.assertEqual(self.fake_image.header['GSP_TRIM'], trim_section) + self.assertIsInstance(new_group, pandas.DataFrame) + self.assertTrue(comp_group.equals(new_group)) - def test_image_trim_trim_section_none(self): - self.assertEqual(self.fake_image.data.shape, (100, 100)) - self.fake_image = image_trim(ccd=self.fake_image, - trim_section=None, - trim_type='trimsec') - self.assertEqual(self.fake_image.data.shape, (100, 100)) + def test__order_validation(self): + should_be_true = self.rd._order_validation(range(10)) - def test_save_extracted_target_zero(self): - self.fake_image.header.set('GSP_FNAM', value=self.file_name) - same_fake_image = save_extracted(ccd=self.fake_image, - destination=self.current_directory, - prefix='e', - target_number=0) - self.assertEqual(same_fake_image, self.fake_image) - self.assertTrue(os.path.isfile('e' + self.file_name)) + self.assertTrue(should_be_true) + + should_be_false = self.rd._order_validation(range(10)[::-1]) + self.assertFalse(should_be_false) + + def test__load_nist_list(self): + self.assertIsInstance(self.rd.nist, dict) + self.assertEqual(0, len(self.rd.nist)) - def test_save_extracted_target_non_zero(self): - self.fake_image.header.set('GSP_FNAM', value=self.file_name) - same_fake_image = save_extracted(ccd=self.fake_image, - destination=self.current_directory, - prefix='e', - target_number=self.target_non_zero) - self.assertEqual(same_fake_image, self.fake_image) - self.assertTrue(os.path.isfile('e' + re.sub('.fits', - '_target_{:d}.fits'.format( - self.target_non_zero), - self.file_name))) + self.rd._load_nist_list() + self.assertIsInstance(self.rd.nist, dict) + self.assertGreater(len(self.rd.nist), 0) - def test_save_extracted_target_zero_comp(self): - self.fake_image.header.set('GSP_FNAM', value=self.file_name) - self.fake_image.header.set('OBSTYPE', value='COMP') - self.fake_image.header.set('GSP_EXTR', value='100.00:101.00') - same_fake_image = save_extracted(ccd=self.fake_image, - destination=self.current_directory, - prefix='e', - target_number=0) - self.assertEqual(same_fake_image, self.fake_image) - self.assertTrue(os.path.isfile(self.fake_image.header['GSP_FNAM'])) +class SaturationValuesTest(TestCase): - def tearDown(self): - files_to_remove = [self.full_path, self.fake_image.header['GSP_FNAM']] + def setUp(self): + self.ccd = CCDData(data=np.ones((100, 100)), + meta=fits.Header(), + unit='adu') + self.ccd.header.set('INSTCONF', value='Red') + self.ccd.header.set('GAIN', value=1.48) + self.ccd.header.set('RDNOISE', value=3.89) - for _file in files_to_remove: - if os.path.isfile(_file): - os.unlink(_file) + self.half_full_well = 69257 + self.saturation_values = SaturationValues(ccd=self.ccd) -class NightDataContainerTests(TestCase): + def test_half_full_well_value(self): + self.assertEqual(self.saturation_values.saturation_value, + self.half_full_well) + + def test_empty_result(self): + self.ccd.header['GAIN'] = 2.3 + result = self.saturation_values.get_saturation_value(ccd=self.ccd) + self.assertIsNone(result) + self.assertIsNone(self.saturation_values.saturation_value) + + +class SearchCompGroupTest(TestCase): def setUp(self): - self.container = NightDataContainer(path=os.getcwd(), - instrument='Red', - technique='Spectroscopy') + columns = ['object', + 'grating', + 'cam_targ', + 'grt_targ', + 'filter', + 'filter2', + 'lamp_hga', + 'lamp_ne', + 'lamp_ar', + 'lamp_fe', + 'lamp_cu'] - columns = ['file', 'obstype'] - sample_data_1 = [['file1.fits', 'OBJECT']] - sample_data_2 = [['file1.fits', 'OBJECT'], - ['file2.fits', 'OBJECT']] + self.object_group = pandas.DataFrame( + data=[['NGC2070', + 'SYZY_400', + 16.1, + 7.5, + '', + 'GG455', + 'TRUE', + 'FALSE', + 'FALSE', + 'FALSE', + 'FALSE' + ]], + columns=columns) + self.object_group_no_match = pandas.DataFrame( + data=[['NGC2070', + 'SYZY_600', + 16.1, + 7.5, + '', + 'GG455', + 'TRUE', + 'FALSE', + 'FALSE', + 'FALSE', + 'FALSE']], + columns=columns) + self.comp_groups = [ + pandas.DataFrame( + data=[['HgArNe', + 'SYZY_400', + 16.1, + 7.5, + '', + 'GG455', + 'TRUE', + 'FALSE', + 'FALSE', + 'FALSE', + 'FALSE']], columns=columns), + pandas.DataFrame( + data=[['CuArNe', + 'SYZY_400', + 11.6, + 5.8, + '', + 'GG455', + 'TRUE', + 'FALSE', + 'FALSE', + 'FALSE', + 'FALSE']], columns=columns)] - self.sample_df_1 = pandas.DataFrame(sample_data_1, columns=columns) - self.sample_df_2 = pandas.DataFrame(sample_data_2, columns=columns) + self.reference_data = ReferenceData( + reference_dir=os.path.join(os.getcwd(), + 'goodman_pipeline/data/ref_comp')) - def test___repr___method_empty(self): - result = self.container.__repr__() - self.assertEqual(result, 'Empty Data Container') + def test_search_comp_group(self): + result = search_comp_group( + object_group=self.object_group, + comp_groups=self.comp_groups, + reference_data=self.reference_data) + self.assertIsInstance(result, pandas.DataFrame) + self.assertFalse(result.empty) - def test___repr___method_not_empty(self): - self.container.is_empty = False - self.container.gain = 1 - self.container.rdnoise = 1 - self.container.roi = 'roi' - result = self.container.__repr__() + def test_search_comp_group_no_match(self): + with self.assertRaises(NoMatchFound): + search_comp_group( + object_group=self.object_group_no_match, + comp_groups=self.comp_groups, + reference_data=self.reference_data) - self.assertIn('Full Path: {:s}'.format(os.getcwd()), result) - self.assertIn('Instrument: Red', result) - self.assertIn('Technique: Spectroscopy', result) - self.assertIn('Is Empty: False', result) - _expected_content = ['Data Grouping Information', - 'BIAS Group:', - 'Group is Empty', - 'Day FLATs Group:', - 'Dome FLATs Group:', - 'Sky FLATs Group:', - 'COMP Group:', - 'OBJECT Group', - 'OBJECT + COMP Group:'] +class SlitTrimTest(TestCase): + # TODO (simon): discuss with Bruno - for _line in _expected_content: - self.assertIn(_line, result) + def setUp(self): + # Create fake image + self.fake_image = CCDData(data=np.ones((100, 100)), + meta=fits.Header(), + unit='adu') - def test___repr___method_imaging_not_empty(self): - self.container.technique = 'Imaging' - self.container.add_bias(bias_group=self.sample_df_2) - self.container.add_day_flats(day_flats=self.sample_df_1) - self.container.add_data_group(data_group=self.sample_df_2) + # define + self.slit_low_limit = 5 + self.slit_high_limit = 95 - # - self.container.dome_flats = [self.sample_df_1] - self.container.sky_flats = [self.sample_df_2] + self.reference_slit_trim = '[1:100,{:d}:{:d}]'.format( + self.slit_low_limit + 10 + 1, + self.slit_high_limit - 10) - self.container.gain = 1 - self.container.rdnoise = 1 - self.container.roi = 'roi' - result = self.container.__repr__() + # make a flat-like structure + self.fake_image.data[self.slit_low_limit:self.slit_high_limit, :] = 100 - self.assertNotIn('Group is Empty', result) + def test_get_slit_trim_section__slit_within_data(self): + + slit_trim = get_slit_trim_section(master_flat=self.fake_image) + # print(fake_image.data[:,5]) + # print(slit_trim) + self.assertEqual(slit_trim, self.reference_slit_trim) + def test_get_slit_trim_section__slit_full_data(self): + self.fake_image.data[:, :] = 100 - @skip - def test__get_group_repr(self): - pass + slit_trim = get_slit_trim_section(master_flat=self.fake_image) + # print(fake_image.data[:,5]) + self.assertEqual(slit_trim, '[1:100,1:100]') - def test_add_bias_imaging_insufficient_bias(self): - self.container.technique = 'Imaging' - self.container.add_bias(bias_group=self.sample_df_1) - self.assertTrue(self.container.bias is None) - self.assertTrue(self.container.is_empty) + def test_image_trim_slit(self): + # # define + # slit_low_limit = 5 + # slit_high_limit = 95 + # + # slit_trim = '[1:100,{:d}:{:d}]'.format(slit_low_limit + 10 + 1, + # slit_high_limit - 10) + self.fake_image = image_trim(ccd=self.fake_image, + trim_section=self.reference_slit_trim, + trim_type='slit') + self.assertIsInstance(self.fake_image, CCDData) + reference_size = (self.slit_high_limit - 10) - \ + (self.slit_low_limit + 10) + self.assertEqual(self.fake_image.data.shape, (reference_size, 100)) - def test_add_bias_spectroscopy_insufficient_bias(self): - self.container.add_bias(bias_group=self.sample_df_1) - self.assertTrue(self.container.bias is None) - self.assertTrue(self.container.is_empty) + self.assertEqual(self.fake_image.header['GSP_SLIT'], + self.reference_slit_trim) - def test_add_bias(self): - self.container.add_bias(bias_group=self.sample_df_2) - self.container.add_bias(bias_group=self.sample_df_2) - self.assertFalse(self.container.bias is None) - self.assertFalse(self.container.is_empty) - def test_add_day_flats(self): - self.container.add_day_flats(day_flats=self.sample_df_1) - self.assertIsInstance(self.container.day_flats[0], pandas.DataFrame) - self.container.add_day_flats(day_flats=self.sample_df_2) - self.assertFalse(self.container.day_flats is None) - self.assertFalse(self.container.is_empty) - def test_add_data_group(self): - self.container.add_data_group(data_group=self.sample_df_1) - self.assertIsInstance(self.container.data_groups[0], pandas.DataFrame) - self.container.add_data_group(data_group=self.sample_df_2) - self.assertFalse(self.container.data_groups is None) - self.assertFalse(self.container.is_empty) - def test_add_comp_group(self): - self.container.add_comp_group(comp_group=self.sample_df_1) - self.assertIsInstance(self.container.comp_groups[0], pandas.DataFrame) - self.container.add_comp_group(comp_group=self.sample_df_2) - self.assertFalse(self.container.comp_groups is None) - self.assertFalse(self.container.is_empty) +class SpectroscopicModeTest(TestCase): + + def setUp(self): + self.sm = SpectroscopicMode() + self.ccd = CCDData(data=np.ones((800, 2000)), + meta=fits.Header(), + unit='adu') + self.ccd.header.set('GRATING', value='SYZY_400') + self.ccd.header.set('CAM_TARG', value='16.1') + self.ccd.header.set('GRT_TARG', value='7.5') + self.ccd.header.set('FILTER2', value='GG455') + + def test__call__(self): + self.assertRaises(SyntaxError, self.sm) - def test_add_object_group(self): - self.container.add_object_group(object_group=self.sample_df_1) - self.assertIsInstance(self.container.object_groups[0], pandas.DataFrame) - self.container.add_object_group(object_group=self.sample_df_2) - self.assertFalse(self.container.object_groups is None) - self.assertFalse(self.container.is_empty) + mode_m2_header = self.sm(header=self.ccd.header) - def test_add_spec_group(self): - self.container.add_spec_group(spec_group=self.sample_df_1) - self.assertIsInstance(self.container.spec_groups[0], pandas.DataFrame) - self.container.add_spec_group(spec_group=self.sample_df_2) - self.assertFalse(self.container.spec_groups is None) - self.assertFalse(self.container.is_empty) + self.assertEqual(mode_m2_header, 'm2') - def test_set_sun_times(self): - _sun_set = '2019-01-01T18:00:00' - _sun_rise = '2019-01-01T06:00:00' - self.container.set_sun_times(sun_set=_sun_set, sun_rise=_sun_rise) + mode_m2_keywords = self.sm(grating=self.ccd.header['GRATING'], + camera_targ=self.ccd.header['CAM_TARG'], + grating_targ=self.ccd.header['GRT_TARG'], + blocking_filter=self.ccd.header['FILTER2']) - self.assertEqual(self.container.sun_set_time, _sun_set) - self.assertEqual(self.container.sun_rise_time, _sun_rise) + self.assertEqual(mode_m2_keywords, 'm2') - def test_set_twilight_times(self): - _evening = '2019-01-01T18:00:00' - _morning = '2019-01-01T06:00:00' + def test_get_mode(self): + mode_m2 = self.sm.get_mode(grating='400', + camera_targ='16.1', + grating_targ='7.5', + blocking_filter='GG455') + self.assertEqual(mode_m2, 'm2') - self.container.set_twilight_times(evening=_evening, morning=_morning) + mode_custom_400 = self.sm.get_mode(grating='400', + camera_targ='16.1', + grating_targ='6.6', + blocking_filter='GG455') - self.assertEqual(self.container.evening_twilight, _evening) - self.assertEqual(self.container.morning_twilight, _morning) + self.assertEqual(mode_custom_400, 'Custom_7000nm') - def test_set_readout(self): - _gain = 1.48 - _rdnoise = 3.89 - _roi = 'Spectroscopic 2x2' + mode_custom_2100 = self.sm.get_mode(grating='2100', + camera_targ='16.1', + grating_targ='7.5', + blocking_filter='GG455') + self.assertEqual(mode_custom_2100, 'Custom_1334nm') - self.container.set_readout(gain=_gain, rdnoise=_rdnoise, roi=_roi) + def test_get_cam_grt_targ_angle(self): - self.assertEqual(self.container.gain, _gain) - self.assertEqual(self.container.rdnoise, _rdnoise) - self.assertEqual(self.container.roi, _roi) + cam_targ, grt_targ = self.sm.get_cam_grt_targ_angle(1800, 'm10') + self.assertIsNone(cam_targ) + self.assertIsNone(grt_targ) + cam_targ, grt_targ = self.sm.get_cam_grt_targ_angle(930, 'm5') + self.assertEqual(cam_targ, '39.4') + self.assertEqual(grt_targ, '19.7') + + cam_targ, grt_targ = self.sm.get_cam_grt_targ_angle(930, 'm7') + self.assertIsNone(cam_targ) + self.assertIsNone(grt_targ) -class SaturationValuesTest(TestCase): + +class TargetsTest(TestCase): def setUp(self): - self.ccd = CCDData(data=np.ones((100, 100)), + self.ccd = CCDData(data=np.ones((800, 2000)), meta=fits.Header(), unit='adu') - self.ccd.header.set('INSTCONF', value='Red') - self.ccd.header.set('GAIN', value=1.48) - self.ccd.header.set('RDNOISE', value=3.89) - self.half_full_well = 69257 + self.ccd.header.set('GSP_FNAM', + value='fake-name.fits', + comment='Fake file name') - self.saturation_values = SaturationValues(ccd=self.ccd) + self.profile_1 = models.Gaussian1D(amplitude=200, + mean=100, + stddev=10).rename('Profile_1') + self.profile_2 = models.Gaussian1D(amplitude=200, + mean=600, + stddev=10).rename('Profile_2') - def test_half_full_well_value(self): - self.assertEqual(self.saturation_values.saturation_value, - self.half_full_well) + self.profile_3 = models.Moffat1D(amplitude=200, + x_0=600, + gamma=3).rename('Profile_3') - def test_empty_result(self): - self.ccd.header['GAIN'] = 2.3 - result = self.saturation_values.get_saturation_value(ccd=self.ccd) - self.assertIsNone(result) - self.assertIsNone(self.saturation_values.saturation_value) + profile_sum = self.profile_1 + self.profile_2 + self.ccd2 = self.ccd.copy() + self.no_targets_ccd = self.ccd.copy() + for i in range(self.ccd.data.shape[1]): + self.ccd.data[:, i] *= profile_sum(range(self.ccd.data.shape[0])) + self.ccd2.data[:, i] *= self.profile_3( + range(self.ccd2.data.shape[0])) + # this add noise to test the removal of masked values + # self.ccd.data[ + # random.randrange(self.ccd.data.shape[0]), + # random.randrange(self.ccd.data.shape[1])] *= 300 + # self.ccd2.data[ + # random.randrange(self.ccd2.data.shape[0]), + # random.randrange(self.ccd2.data.shape[1])] *= 300 -def test_define_trim_section(): - pass + def tearDown(self): + del self.ccd + del self.profile_1 + del self.profile_2 + del self.profile_3 -def test_get_overscan_region(): - pass + def test_identify_targets_moffat(self): + self.ccd.header.set('OBSTYPE', + value='OBJECT', + comment='Fake values') + self.ccd.header.set('SLIT', + value='1.03" long slit', + comment='Fake slit') + self.ccd.header.set('CCDSUM', + value='1 1', + comment='Fake values') + targets = identify_targets(ccd=self.ccd, + fit_model='moffat', + background_threshold=3, + nfind=2, + plots=False) + self.assertEqual(len(targets), 2) + for target in targets: + self.assertIsInstance(target, Model) + def test_identify_targets_gaussian(self): + self.ccd.header.set('OBSTYPE', + value='OBJECT', + comment='Fake values') + self.ccd.header.set('SLIT', + value='1.03" long slit', + comment='Fake slit') + self.ccd.header.set('CCDSUM', + value='1 1', + comment='Fake values') + targets = identify_targets(ccd=self.ccd, + fit_model='gaussian', + background_threshold=3, + nfind=2, + plots=False) + self.assertEqual(len(targets), 2) + for target in targets: + self.assertIsInstance(target, Model) -def test_create_master_bias(): - pass + def test_identify_targets_empty_output(self): + self.no_targets_ccd.header.set('OBSTYPE', + value='OBJECT', + comment='Fake values') + self.no_targets_ccd.header.set('SLIT', + value='1.03" long slit', + comment='Fake slit') + self.no_targets_ccd.header.set('CCDSUM', + value='1 1', + comment='Fake values') + targets = identify_targets(ccd=self.no_targets_ccd, + fit_model='gaussian', + background_threshold=3, + nfind=2, + plots=False) + self.assertEqual(len(targets), 0) + def test_trace_gaussian(self): + trace_model = models.Polynomial1D(degree=2) + fitter = fitting.LevMarLSQFitter() + test_trace, trace_rms = trace(ccd=self.ccd, + model=self.profile_1, + trace_model=trace_model, + model_fitter=fitter, + sampling_step=5) + self.assertEqual(test_trace.c0.value, self.profile_1.mean.value) + self.assertAlmostEqual(test_trace.c1.value, 0.) + self.assertAlmostEqual(test_trace.c2.value, 0.) -def test_create_master_flats(): - pass + def test_trace_moffat(self): + trace_model = models.Polynomial1D(degree=2) + fitter = fitting.LevMarLSQFitter() + test_trace, trace_rms = trace(ccd=self.ccd2, + model=self.profile_3, + trace_model=trace_model, + model_fitter=fitter, + sampling_step=5) + self.assertEqual(test_trace.c0.value, self.profile_3.x_0.value) + self.assertAlmostEqual(test_trace.c1.value, 0.) + self.assertAlmostEqual(test_trace.c2.value, 0.) + def test_trace_not_implemented(self): + trace_model = models.Polynomial1D(degree=2) + fitter = fitting.LevMarLSQFitter() + self.assertRaises(NotImplementedError, + trace, + self.ccd2, + models.BlackBody1D(), + trace_model, + fitter, + 5) -def test_name_master_flats(): - pass + def test_trace_targets(self): + targets = [self.profile_1, self.profile_2] + all_traces = trace_targets(ccd=self.ccd, + target_list=targets, + sampling_step=5, + pol_deg=2, + nfwhm=2, + plots=False) + for new_trace, profile, trace_info in all_traces: + self.assertEqual(new_trace.c0.value, profile.mean.value) + self.assertAlmostEqual(new_trace.c1.value, 0) + self.assertAlmostEqual(new_trace.c2.value, 0) -class IsFileSaturatedTest(TestCase): +class TimeConversionTest(TestCase): def setUp(self): - self.ccd = CCDData(data=np.ones((100, 100)), - meta=fits.Header(), - unit='adu') - self.ccd.header.set('INSTCONF', value='Red') - self.ccd.header.set('GAIN', value=1.48) - self.ccd.header.set('RDNOISE', value=3.89) + self.test_time_str = '2018-01-17T12:05:44.250' + self.test_time_sec = 1516190744.0 - self.half_full_well = 69257 + def test_convert_time(self): + self.assertEqual(convert_time(self.test_time_str), self.test_time_sec) - def test_file_is_saturated(self): - self.ccd.data[:10, :10] = self.half_full_well + 1 - self.assertTrue(is_file_saturated(ccd=self.ccd, threshold=1)) + def test_get_twilight_time(self): + expected_evening_twilight = '2018-01-17T01:21:26.113' + expected_morning_twilight = '2018-01-17T08:24:38.919' + expected_sun_set_time = '2018-01-17T23:43:46.782' + expected_sun_rise_time = '2018-01-17T10:02:04.508' + evening_twilight, morning_twilight, sun_set, sun_rise\ + = get_twilight_time([self.test_time_str]) + + self.assertEqual(evening_twilight, expected_evening_twilight) + self.assertEqual(morning_twilight, expected_morning_twilight) + self.assertEqual(sun_set, expected_sun_set_time) + self.assertEqual(sun_rise, expected_sun_rise_time) + + +class ValidateCcdRegionTest(TestCase): + + def test_validate_ccd_region_valid(self): + self.assertTrue(validate_ccd_region, '[1:1,10:10]') + + def test_validate_ccd_region_invalid(self): + self.assertRaises(SyntaxError, validate_ccd_region, "10:10:10]") - def test_file_is_not_saturated(self): - self.ccd.data[:10, :10] = self.half_full_well + 1 - self.ccd.data[0, 0] = 1 - self.assertFalse(is_file_saturated(ccd=self.ccd, threshold=1)) diff --git a/goodman_pipeline/images/tests/test_goodman_ccd.py b/goodman_pipeline/images/tests/test_goodman_ccd.py index 9c241d5e..4eb25b7e 100644 --- a/goodman_pipeline/images/tests/test_goodman_ccd.py +++ b/goodman_pipeline/images/tests/test_goodman_ccd.py @@ -1,5 +1,5 @@ from __future__ import absolute_import - +from unittest import TestCase, skip from ..goodman_ccd import get_args, MainApp @@ -8,5 +8,3 @@ def test_get_args(): pass -def test_main_app(): - pass \ No newline at end of file diff --git a/goodman_pipeline/images/tests/test_image_processor.py b/goodman_pipeline/images/tests/test_image_processor.py index 7d01bc36..3d923936 100644 --- a/goodman_pipeline/images/tests/test_image_processor.py +++ b/goodman_pipeline/images/tests/test_image_processor.py @@ -31,6 +31,8 @@ def setUp(self): self.half_full_well = 69257 + def test___call__(self): + self.image_processor() def test_process_spectroscopy_science(): pass diff --git a/goodman_pipeline/images/tests/test_night_organizer.py b/goodman_pipeline/images/tests/test_night_organizer.py index 811413bd..447c421d 100644 --- a/goodman_pipeline/images/tests/test_night_organizer.py +++ b/goodman_pipeline/images/tests/test_night_organizer.py @@ -11,6 +11,7 @@ from ..night_organizer import NightOrganizer + def create_fake_data(technique, instrument, path): if os.path.isdir(path): card_values = [ diff --git a/goodman_pipeline/spectroscopy/tests/test_redspec.py b/goodman_pipeline/spectroscopy/tests/test_redspec.py index 5e14067d..338e3a42 100644 --- a/goodman_pipeline/spectroscopy/tests/test_redspec.py +++ b/goodman_pipeline/spectroscopy/tests/test_redspec.py @@ -60,6 +60,10 @@ def test_absolute_path_exists(self): # If source folder does not exists, print message and leave program # error_message = 'Input folder "{} does not exists."' + def test_get_args_output_path_does_not_exist(self): + arguments = ['--data-path', 'does_not_exist'] + self.assertRaises(SystemExit, get_args, arguments) + def test_get_args(): from ...spectroscopy.redspec import get_args diff --git a/goodman_pipeline/spectroscopy/wavelength.py b/goodman_pipeline/spectroscopy/wavelength.py index 6313f86b..577ce0ee 100644 --- a/goodman_pipeline/spectroscopy/wavelength.py +++ b/goodman_pipeline/spectroscopy/wavelength.py @@ -326,7 +326,7 @@ def _automatic_wavelength_solution(self, global_cross_corr = cross_correlation( reference=reference_lamp_ccd.data, - new_array=self.lamp.data, + compared=self.lamp.data, slit_size=slit_size, serial_binning=self.serial_binning) @@ -356,7 +356,7 @@ def _automatic_wavelength_solution(self, correlation_value = cross_correlation( reference=ref_sample, - new_array=lamp_sample, + compared=lamp_sample, slit_size=slit_size, serial_binning=self.serial_binning)