Skip to content

Commit

Permalink
Merge pull request #119 from DUNE/feature/add_off_beam_event_building
Browse files Browse the repository at this point in the history
Feature/add off beam event building
  • Loading branch information
diaza authored May 17, 2024
2 parents ad7b959 + 9985c7d commit 322d610
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 21 deletions.
126 changes: 105 additions & 21 deletions src/proto_nd_flow/reco/charge/raw_event_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def get_config(self):
rollover_ticks=self.rollover_ticks,
)

def build_events(self, packets, unix_ts, mc_assn=None):
def build_events(self, packets, unix_ts, mc_assn=None, ts=None, return_ts=False):
# fetch attribute from appropriate process
self.cross_rank_get_attrs('event_buffer', 'event_buffer_unix_ts', 'event_buffer_mc_assn')

Expand All @@ -278,22 +278,23 @@ def build_events(self, packets, unix_ts, mc_assn=None):
# sort packets to fix 512 bug
packets = np.append(self.event_buffer, packets) if len(self.event_buffer) else packets

# correct for rollovers
rollover = np.zeros((len(packets),), dtype='i8')
for io_group in np.unique(packets['io_group']):
# find rollovers
mask = (packets['io_group'] == io_group) & (packets['packet_type'] == 6) & (packets['trigger_type'] == 83)
rollover[mask] = self.rollover_ticks
# calculate sum of rollovers
mask = (packets['io_group'] == io_group)
rollover[mask] = np.cumsum(rollover[mask]) - rollover[mask]
# correct for readout delay (only in real data)
if mc_assn is None:
mask = (packets['io_group'] == io_group) & (packets['packet_type'] == 0) \
& (packets['receipt_timestamp'].astype(int) - packets['timestamp'].astype(int) < 0)
rollover[mask] -= self.rollover_ticks

ts = packets['timestamp'].astype('i8') + rollover
if ts is None:
# correct for rollovers
rollover = np.zeros((len(packets),), dtype='i8')
for io_group in np.unique(packets['io_group']):
# find rollovers
mask = (packets['io_group'] == io_group) & (packets['packet_type'] == 6) & (packets['trigger_type'] == 83)
rollover[mask] = self.rollover_ticks
# calculate sum of rollovers
mask = (packets['io_group'] == io_group)
rollover[mask] = np.cumsum(rollover[mask]) - rollover[mask]
# correct for readout delay (only in real data)
if mc_assn is None:
mask = (packets['io_group'] == io_group) & (packets['packet_type'] == 0) \
& (packets['receipt_timestamp'].astype(int) - packets['timestamp'].astype(int) < 0)
rollover[mask] -= self.rollover_ticks

ts = packets['timestamp'].astype('i8') + rollover

sorted_idcs = np.argsort(ts)
ts = ts[sorted_idcs]
Expand Down Expand Up @@ -325,6 +326,10 @@ def build_events(self, packets, unix_ts, mc_assn=None):
if mc_assn is not None:
self.event_buffer_mc_assn = np.empty((0,), dtype=mc_assn.dtype)
self.cross_rank_set_attrs('event_buffer', 'event_buffer_unix_ts', 'event_buffer_mc_assn')

if return_ts:
return (([], []), []) if mc_assn is None \
else (([], [], []), [])
return ([], []) if mc_assn is None \
else ([], [], [])
if not len(event_start_timestamp):
Expand All @@ -338,6 +343,10 @@ def build_events(self, packets, unix_ts, mc_assn=None):
if mc_assn is not None:
self.event_buffer_mc_assn = mc_assn[mask]
self.cross_rank_set_attrs('event_buffer', 'event_buffer_unix_ts', 'event_buffer_mc_assn')
if return_ts:
return (([], []), []) if mc_assn is None \
else (([], [], []), [])

return ([], []) if mc_assn is None \
else ([], [], [])

Expand Down Expand Up @@ -376,11 +385,17 @@ def build_events(self, packets, unix_ts, mc_assn=None):
is_event = np.r_[False, event_mask[event_idcs]]

events = np.split(packets, event_idcs)
event_ts = np.split(ts, event_idcs)
event_ts = list( [ np.min(times) for i, times in enumerate(event_ts) if is_event[i] ] )
event_unix_ts = np.split(unix_ts, event_idcs)
if mc_assn is not None:
event_mc_assn = np.split(mc_assn, event_idcs)

# only return packets from events
if return_ts:
return (zip(*[v for i, v in enumerate(zip(events, event_unix_ts)) if is_event[i]]), event_ts) if mc_assn is None \
else (zip(*[v for i, v in enumerate(zip(events, event_unix_ts, event_mc_assn)) if is_event[i]]), event_ts)

return zip(*[v for i, v in enumerate(zip(events, event_unix_ts)) if is_event[i]]) if mc_assn is None \
else zip(*[v for i, v in enumerate(zip(events, event_unix_ts, event_mc_assn)) if is_event[i]])

Expand All @@ -393,17 +408,26 @@ class ExtTrigRawEventBuilder(RawEventBuilder):
default_rollover_ticks = 1E7
default_trig_io_grp = 1

default_build_off_beam_events=False
default_off_beam_window = 1820 // 2
default_off_beam_threshold = 10

def __init__(self, **params):
super(ExtTrigRawEventBuilder, self).__init__(**params)
self.window = params.get('window', self.default_window)
self.rollover_ticks = params.get('rollover_ticks', self.default_rollover_ticks)
self.trig_io_grp = params.get('trig_io_grp', self.default_trig_io_grp)
self.build_off_beam_events = params.get('build_off_beam_events', self.default_build_off_beam_events)
self.off_beam_window = params.get('off_beam_window', self.default_off_beam_window)
self.off_beam_threshold = params.get('off_beam_threshold', self.default_off_beam_threshold)
self.VALIDATE_HACK = params.get('VALIDATE_HACK', False)

self.event_buffer = np.empty((0,))
self.event_buffer_unix_ts = np.empty((0,), dtype='u8')
self.event_buffer_mc_assn = np.empty((0,))
self.prepend_count = 0
self.last_beam_trigger_idx = None

def get_config(self):
return dict(
window=self.window,
Expand All @@ -426,22 +450,36 @@ def build_events(self, packets, unix_ts, mc_assn=None):
ts = packets['timestamp'].astype('i8') + rollover
sorted_idcs = np.argsort(ts)
ts = ts[sorted_idcs]

packets = packets[sorted_idcs]
unix_ts = unix_ts[sorted_idcs]
if mc_assn is not None:
mc_assn = mc_assn[sorted_idcs]

beam_trigger_idxs = np.where((packets['io_group'] == self.trig_io_grp) & (packets['packet_type'] == 7))[0]
if self.VALIDATE_HACK:
n_orig = len(beam_trigger_idxs)
beam_trigger_idxs = beam_trigger_idxs[::3]
print('\n*****************\nValidation HACK! Ommiting beam some triggers:')
print('Total beam triggers:', len(beam_trigger_idxs))
print('Total off-beam:', n_orig-len(beam_trigger_idxs))
print('\n*****************\n')

if len(beam_trigger_idxs) == 0:
return ([], []) if mc_assn is None else ([], [], [])

events = []
event_unix_ts = []
event_mc_assn = [] if mc_assn is not None else None

start_times = []

# Mask to keep track of packets associated to beam events
# Only used if off-beam events are built later with unused packets
used_mask = np.zeros( len(unix_ts) ) < -1
for i, start_idx in enumerate(beam_trigger_idxs):
this_trig_time = ts[start_idx]
start_times.append(this_trig_time)
# FIXME & (ts % 1E7 != 0) is a hot fix for PPS signal
hotfix_mask = (ts % 1E7 != 0) | ((ts % 1E7 == 0) & (packets['io_group'] == self.trig_io_grp) & (packets['packet_type'] == 7))
mask = ((ts - this_trig_time) >= 0) & ((ts - this_trig_time) <= self.window) & hotfix_mask
Expand All @@ -450,6 +488,52 @@ def build_events(self, packets, unix_ts, mc_assn=None):
if mc_assn is not None:
event_mc_assn.append(mc_assn[mask])

return zip(*[v for v in zip(events, event_unix_ts)]) if mc_assn is None \
else zip(*[v for v in zip(events, event_unix_ts, event_mc_assn)])
used_mask = np.logical_or( used_mask, mask )


if not self.build_off_beam_events:
return zip(*[v for v in zip(events, event_unix_ts)]) if mc_assn is None \
else zip(*[v for v in zip(events, event_unix_ts, event_mc_assn)])

# build off beam events using SymmetricRawEventBuilder
if self.VALIDATE_HACK: print('USING OFF BEAM BUILDER!!')
off_beam_config = {'window' : self.off_beam_window,
'threshold' : self.off_beam_threshold,
'rollover_ticks' : self.rollover_ticks
}
off_beam_builder = SymmetricWindowRawEventBuilder( **off_beam_config )

off_beam_events, off_beam_event_unix_ts, off_beam_event_mc_assn = [], [], []
if not mc_assn is None:
(off_beam_events_list, off_beam_ts) = off_beam_builder.build_events(packets[~used_mask], unix_ts[~used_mask], mc_assn[~used_mask], ts=ts[~used_mask], return_ts=True)

off_beam_events_list=list(off_beam_events_list)
off_beam_events = list(off_beam_events_list[0])
off_beam_event_unix_ts = list(off_beam_events_list[1])
off_beam_event_mc_assn = list(off_beam_events_list[2])
else:
(off_beam_events_list, off_beam_ts) = off_beam_builder.build_events(packets[~used_mask], unix_ts[~used_mask], mc_assn, ts[~used_mask], return_ts=True)
off_beam_events_list=list(off_beam_events_list)
off_beam_events = list(off_beam_events_list[0])
off_beam_event_unix_ts = list(off_beam_events_list[1])

if self.VALIDATE_HACK:
print('N Beam events found:', len(events))
print('N Off beam events:', len(off_beam_events))

full_events = events + off_beam_events

if self.VALIDATE_HACK:
print('Total events returning:', len(full_events))

full_event_unix_ts = event_unix_ts + off_beam_event_unix_ts
if not mc_assn is None: full_event_mc_assn = event_mc_assn + off_beam_event_mc_assn

sorted_event_indices = np.argsort( start_times + off_beam_ts )
full_events = [full_events[index] for index in sorted_event_indices]
full_event_unix_ts = [full_event_unix_ts[index] for index in sorted_event_indices]

if not mc_assn is None: full_event_mc_assn = [full_event_mc_assn[index] for index in sorted_event_indices]

return zip(*[v for v in zip(full_events, full_event_unix_ts)]) if mc_assn is None \
else zip(*[v for v in zip(full_events, full_event_unix_ts, full_event_mc_assn)])
2 changes: 2 additions & 0 deletions yamls/proto_nd_flow/reco/charge/RawEventGenerator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ params:
window: 2000 # ExtTrigRawEventBuilder - 2000, more than one drift for 500V/cm
trig_io_grp: 1
rollover_ticks: 10000000 # PPS = 1e7 ticks
build_off_beam_events : True
VALIDATE_HACK : False

0 comments on commit 322d610

Please sign in to comment.