From 318c44eaa4dc0ff8d68c6b52d84cfe110c861e23 Mon Sep 17 00:00:00 2001 From: Stephen Elliott Greenberg Date: Thu, 25 Apr 2024 08:02:24 -0700 Subject: [PATCH 1/3] add off-beam event building option. NOTE: DEFAULT validation is true. Change this before merging to develop --- .../reco/charge/raw_event_builder.py | 104 ++++++++++++++---- .../reco/charge/RawEventGenerator.yaml | 2 + 2 files changed, 84 insertions(+), 22 deletions(-) diff --git a/src/proto_nd_flow/reco/charge/raw_event_builder.py b/src/proto_nd_flow/reco/charge/raw_event_builder.py index 48b5d6af..91157457 100644 --- a/src/proto_nd_flow/reco/charge/raw_event_builder.py +++ b/src/proto_nd_flow/reco/charge/raw_event_builder.py @@ -262,7 +262,7 @@ def get_config(self): rollover_ticks=self.rollover_ticks, ) - def build_events(self, packets, unix_ts, mc_assn=None): + def build_events(self, packets, unix_ts, mc_assn=None, ts=None): # fetch attribute from appropriate process self.cross_rank_get_attrs('event_buffer', 'event_buffer_unix_ts', 'event_buffer_mc_assn') @@ -278,22 +278,23 @@ def build_events(self, packets, unix_ts, mc_assn=None): # sort packets to fix 512 bug packets = np.append(self.event_buffer, packets) if len(self.event_buffer) else packets - # correct for rollovers - rollover = np.zeros((len(packets),), dtype='i8') - for io_group in np.unique(packets['io_group']): - # find rollovers - mask = (packets['io_group'] == io_group) & (packets['packet_type'] == 6) & (packets['trigger_type'] == 83) - rollover[mask] = self.rollover_ticks - # calculate sum of rollovers - mask = (packets['io_group'] == io_group) - rollover[mask] = np.cumsum(rollover[mask]) - rollover[mask] - # correct for readout delay (only in real data) - if mc_assn is None: - mask = (packets['io_group'] == io_group) & (packets['packet_type'] == 0) \ - & (packets['receipt_timestamp'].astype(int) - packets['timestamp'].astype(int) < 0) - rollover[mask] -= self.rollover_ticks - - ts = packets['timestamp'].astype('i8') + rollover + if ts is None: + # correct for rollovers + rollover = np.zeros((len(packets),), dtype='i8') + for io_group in np.unique(packets['io_group']): + # find rollovers + mask = (packets['io_group'] == io_group) & (packets['packet_type'] == 6) & (packets['trigger_type'] == 83) + rollover[mask] = self.rollover_ticks + # calculate sum of rollovers + mask = (packets['io_group'] == io_group) + rollover[mask] = np.cumsum(rollover[mask]) - rollover[mask] + # correct for readout delay (only in real data) + if mc_assn is None: + mask = (packets['io_group'] == io_group) & (packets['packet_type'] == 0) \ + & (packets['receipt_timestamp'].astype(int) - packets['timestamp'].astype(int) < 0) + rollover[mask] -= self.rollover_ticks + + ts = packets['timestamp'].astype('i8') + rollover sorted_idcs = np.argsort(ts) ts = ts[sorted_idcs] @@ -393,17 +394,26 @@ class ExtTrigRawEventBuilder(RawEventBuilder): default_rollover_ticks = 1E7 default_trig_io_grp = 1 + default_build_off_beam_events=False + default_off_beam_window = 1820 // 2 + default_off_beam_threshold = 10 + def __init__(self, **params): super(ExtTrigRawEventBuilder, self).__init__(**params) self.window = params.get('window', self.default_window) self.rollover_ticks = params.get('rollover_ticks', self.default_rollover_ticks) self.trig_io_grp = params.get('trig_io_grp', self.default_trig_io_grp) + self.build_off_beam_events = params.get('build_off_beam_events', self.default_build_off_beam_events) + self.off_beam_window = params.get('off_beam_window', self.default_off_beam_window) + self.off_beam_threshold = params.get('off_beam_threshold', self.default_off_beam_threshold) + self.VALIDATE_HACK = params.get('VALIDATE_HACK', False) + self.event_buffer = np.empty((0,)) self.event_buffer_unix_ts = np.empty((0,), dtype='u8') self.event_buffer_mc_assn = np.empty((0,)) self.prepend_count = 0 self.last_beam_trigger_idx = None - + def get_config(self): return dict( window=self.window, @@ -426,20 +436,32 @@ def build_events(self, packets, unix_ts, mc_assn=None): ts = packets['timestamp'].astype('i8') + rollover sorted_idcs = np.argsort(ts) ts = ts[sorted_idcs] - + packets = packets[sorted_idcs] unix_ts = unix_ts[sorted_idcs] if mc_assn is not None: mc_assn = mc_assn[sorted_idcs] beam_trigger_idxs = np.where((packets['io_group'] == self.trig_io_grp) & (packets['packet_type'] == 7))[0] + if self.VALIDATE_HACK: + n_orig = len(beam_trigger_idxs) + beam_trigger_idxs = beam_trigger_idxs[::3] + print('\n*****************\nValidation HACK! Ommiting beam some triggers:') + print('Total beam triggers:', len(beam_trigger_idxs)) + print('Total off-beam:', n_orig-len(beam_trigger_idxs)) + print('\n*****************\n') + if len(beam_trigger_idxs) == 0: return ([], []) if mc_assn is None else ([], [], []) events = [] event_unix_ts = [] event_mc_assn = [] if mc_assn is not None else None - + + # Mask to keep track of packets associated to beam events + # Only used if off-beam events are built later with unused packets + used_mask = np.zeros( len(unix_ts) ) < -1 + for i, start_idx in enumerate(beam_trigger_idxs): this_trig_time = ts[start_idx] # FIXME & (ts % 1E7 != 0) is a hot fix for PPS signal @@ -450,6 +472,44 @@ def build_events(self, packets, unix_ts, mc_assn=None): if mc_assn is not None: event_mc_assn.append(mc_assn[mask]) - return zip(*[v for v in zip(events, event_unix_ts)]) if mc_assn is None \ - else zip(*[v for v in zip(events, event_unix_ts, event_mc_assn)]) + used_mask = np.logical_or( used_mask, mask ) + + + if not self.build_off_beam_events: + return zip(*[v for v in zip(events, event_unix_ts)]) if mc_assn is None \ + else zip(*[v for v in zip(events, event_unix_ts, event_mc_assn)]) + + # build off beam events using SymmetricRawEventBuilder + if self.VALIDATE_HACK: print('USING OFF BEAM BUILDER!!') + off_beam_config = {'window' : self.off_beam_window, + 'threshold' : self.off_beam_threshold, + 'rollover_ticks' : self.rollover_ticks + } + off_beam_builder = SymmetricWindowRawEventBuilder( **off_beam_config ) + + off_beam_events, off_beam_event_unix_ts, off_beam_event_mc_assn = [], [], [] + if not mc_assn is None: + off_beam_events_list = list(off_beam_builder.build_events(packets[~used_mask], unix_ts[~used_mask], mc_assn[~used_mask], ts=ts[~used_mask])) + + off_beam_events = list(off_beam_events_list[0]) + off_beam_event_unix_ts = list(off_beam_events_list[1]) + off_beam_event_mc_assn = list(off_beam_events_list[2]) + else: + off_beam_events_list = list(off_beam_builder.build_events(packets[~used_mask], unix_ts[~used_mask], mc_assn, ts[~used_mask])) + off_beam_events = list(off_beam_events_list[0]) + off_beam_event_unix_ts = list(off_beam_events_list[1]) + + if self.VALIDATE_HACK: + print('N Beam events found:', len(events)) + print('N Off beam events:', len(off_beam_events)) + + full_events = events + off_beam_events + + if self.VALIDATE_HACK: + print('Total events returning:', len(full_events)) + + full_event_unix_ts = event_unix_ts + off_beam_event_unix_ts + if not mc_assn is None: full_event_mc_assn = event_mc_assn + off_beam_event_mc_assn + return zip(*[v for v in zip(full_events, full_event_unix_ts)]) if mc_assn is None \ + else zip(*[v for v in zip(full_events, full_event_unix_ts, full_event_mc_assn)]) diff --git a/yamls/proto_nd_flow/reco/charge/RawEventGenerator.yaml b/yamls/proto_nd_flow/reco/charge/RawEventGenerator.yaml index 2f6877ec..829abd3a 100644 --- a/yamls/proto_nd_flow/reco/charge/RawEventGenerator.yaml +++ b/yamls/proto_nd_flow/reco/charge/RawEventGenerator.yaml @@ -16,3 +16,5 @@ params: window: 2000 # ExtTrigRawEventBuilder - 2000, more than one drift for 500V/cm trig_io_grp: 1 rollover_ticks: 10000000 # PPS = 1e7 ticks + build_off_beam_events : True + VALIDATE_HACK : True From d33379badd592a9f98d0ae5cb799bf4566cf7383 Mon Sep 17 00:00:00 2001 From: Stephen Elliott Greenberg Date: Thu, 25 Apr 2024 12:33:13 -0700 Subject: [PATCH 2/3] sort events by time --- .../reco/charge/raw_event_builder.py | 32 ++++++++++++++++--- .../reco/charge/RawEventGenerator.yaml | 2 +- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/proto_nd_flow/reco/charge/raw_event_builder.py b/src/proto_nd_flow/reco/charge/raw_event_builder.py index 91157457..6cd967be 100644 --- a/src/proto_nd_flow/reco/charge/raw_event_builder.py +++ b/src/proto_nd_flow/reco/charge/raw_event_builder.py @@ -262,7 +262,7 @@ def get_config(self): rollover_ticks=self.rollover_ticks, ) - def build_events(self, packets, unix_ts, mc_assn=None, ts=None): + def build_events(self, packets, unix_ts, mc_assn=None, ts=None, return_ts=False): # fetch attribute from appropriate process self.cross_rank_get_attrs('event_buffer', 'event_buffer_unix_ts', 'event_buffer_mc_assn') @@ -326,6 +326,10 @@ def build_events(self, packets, unix_ts, mc_assn=None, ts=None): if mc_assn is not None: self.event_buffer_mc_assn = np.empty((0,), dtype=mc_assn.dtype) self.cross_rank_set_attrs('event_buffer', 'event_buffer_unix_ts', 'event_buffer_mc_assn') + + if return_ts: + return (([], []), []) if mc_assn is None \ + else (([], [], []), []) return ([], []) if mc_assn is None \ else ([], [], []) if not len(event_start_timestamp): @@ -339,6 +343,10 @@ def build_events(self, packets, unix_ts, mc_assn=None, ts=None): if mc_assn is not None: self.event_buffer_mc_assn = mc_assn[mask] self.cross_rank_set_attrs('event_buffer', 'event_buffer_unix_ts', 'event_buffer_mc_assn') + if return_ts: + return (([], []), []) if mc_assn is None \ + else (([], [], []), []) + return ([], []) if mc_assn is None \ else ([], [], []) @@ -377,11 +385,17 @@ def build_events(self, packets, unix_ts, mc_assn=None, ts=None): is_event = np.r_[False, event_mask[event_idcs]] events = np.split(packets, event_idcs) + event_ts = np.split(ts, event_idcs) + event_ts = list( [ np.min(times) for i, times in enumerate(event_ts) if is_event[i] ] ) event_unix_ts = np.split(unix_ts, event_idcs) if mc_assn is not None: event_mc_assn = np.split(mc_assn, event_idcs) # only return packets from events + if return_ts: + return (zip(*[v for i, v in enumerate(zip(events, event_unix_ts)) if is_event[i]]), event_ts) if mc_assn is None \ + else (zip(*[v for i, v in enumerate(zip(events, event_unix_ts, event_mc_assn)) if is_event[i]]), event_ts) + return zip(*[v for i, v in enumerate(zip(events, event_unix_ts)) if is_event[i]]) if mc_assn is None \ else zip(*[v for i, v in enumerate(zip(events, event_unix_ts, event_mc_assn)) if is_event[i]]) @@ -458,12 +472,14 @@ def build_events(self, packets, unix_ts, mc_assn=None): event_unix_ts = [] event_mc_assn = [] if mc_assn is not None else None + start_times = [] + # Mask to keep track of packets associated to beam events # Only used if off-beam events are built later with unused packets used_mask = np.zeros( len(unix_ts) ) < -1 - for i, start_idx in enumerate(beam_trigger_idxs): this_trig_time = ts[start_idx] + start_times.append(this_trig_time) # FIXME & (ts % 1E7 != 0) is a hot fix for PPS signal hotfix_mask = (ts % 1E7 != 0) | ((ts % 1E7 == 0) & (packets['io_group'] == self.trig_io_grp) & (packets['packet_type'] == 7)) mask = ((ts - this_trig_time) >= 0) & ((ts - this_trig_time) <= self.window) & hotfix_mask @@ -489,13 +505,15 @@ def build_events(self, packets, unix_ts, mc_assn=None): off_beam_events, off_beam_event_unix_ts, off_beam_event_mc_assn = [], [], [] if not mc_assn is None: - off_beam_events_list = list(off_beam_builder.build_events(packets[~used_mask], unix_ts[~used_mask], mc_assn[~used_mask], ts=ts[~used_mask])) + (off_beam_events_list, off_beam_ts) = off_beam_builder.build_events(packets[~used_mask], unix_ts[~used_mask], mc_assn[~used_mask], ts=ts[~used_mask], return_ts=True) + off_beam_events_list=list(off_beam_events_list) off_beam_events = list(off_beam_events_list[0]) off_beam_event_unix_ts = list(off_beam_events_list[1]) off_beam_event_mc_assn = list(off_beam_events_list[2]) else: - off_beam_events_list = list(off_beam_builder.build_events(packets[~used_mask], unix_ts[~used_mask], mc_assn, ts[~used_mask])) + (off_beam_events_list, off_beam_ts) = off_beam_builder.build_events(packets[~used_mask], unix_ts[~used_mask], mc_assn, ts[~used_mask], return_ts=True) + off_beam_events_list=list(off_beam_events_list) off_beam_events = list(off_beam_events_list[0]) off_beam_event_unix_ts = list(off_beam_events_list[1]) @@ -511,5 +529,11 @@ def build_events(self, packets, unix_ts, mc_assn=None): full_event_unix_ts = event_unix_ts + off_beam_event_unix_ts if not mc_assn is None: full_event_mc_assn = event_mc_assn + off_beam_event_mc_assn + sorted_event_indices = np.argsort( start_times + off_beam_ts ) + full_events = [full_events[index] for index in sorted_event_indices] + full_event_unix_ts = [full_event_unix_ts[index] for index in sorted_event_indices] + + if not mc_assn is None: full_event_mc_assn = [full_event_mc_assn[index] for index in sorted_event_indices] + return zip(*[v for v in zip(full_events, full_event_unix_ts)]) if mc_assn is None \ else zip(*[v for v in zip(full_events, full_event_unix_ts, full_event_mc_assn)]) diff --git a/yamls/proto_nd_flow/reco/charge/RawEventGenerator.yaml b/yamls/proto_nd_flow/reco/charge/RawEventGenerator.yaml index 829abd3a..276c335d 100644 --- a/yamls/proto_nd_flow/reco/charge/RawEventGenerator.yaml +++ b/yamls/proto_nd_flow/reco/charge/RawEventGenerator.yaml @@ -17,4 +17,4 @@ params: trig_io_grp: 1 rollover_ticks: 10000000 # PPS = 1e7 ticks build_off_beam_events : True - VALIDATE_HACK : True + VALIDATE_HACK : False From 9985c7d0e7c5580852fde010d624cc2aeeffc6e5 Mon Sep 17 00:00:00 2001 From: Stephen Greenberg Date: Fri, 26 Apr 2024 07:45:41 -0700 Subject: [PATCH 3/3] fix versioning in test --- test/env.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/env.yaml b/test/env.yaml index 9422e3aa..cfe2246e 100644 --- a/test/env.yaml +++ b/test/env.yaml @@ -17,7 +17,7 @@ dependencies: - pytest-mpi - pip - pip: - - pyyaml-include + - pyyaml-include==1.3.2 - h5py --no-binary=h5py - adc64format>=0.0.2 - pylandau