From 992b5a43695d1944a411e803bb69ac268727b07c Mon Sep 17 00:00:00 2001 From: liviocali Date: Fri, 15 Mar 2024 14:52:58 +0100 Subject: [PATCH 1/8] Update event builder to support batch_size>1 --- env-nompi.yaml | 2 +- env.yaml | 2 +- .../reco/light/adc64_event_generator.py | 53 +++++++++++-------- .../reco/light/LightADC64EventGenerator.yaml | 2 +- 4 files changed, 35 insertions(+), 24 deletions(-) diff --git a/env-nompi.yaml b/env-nompi.yaml index 67b9b6c6..a4a735ee 100644 --- a/env-nompi.yaml +++ b/env-nompi.yaml @@ -10,5 +10,5 @@ dependencies: - pip - pip: - h5flow>=0.1.0 - - adc64format>=0.0.2 + - adc64format>=0.1.0 - pylandau diff --git a/env.yaml b/env.yaml index 67b9b6c6..a4a735ee 100644 --- a/env.yaml +++ b/env.yaml @@ -10,5 +10,5 @@ dependencies: - pip - pip: - h5flow>=0.1.0 - - adc64format>=0.0.2 + - adc64format>=0.1.0 - pylandau diff --git a/src/proto_nd_flow/reco/light/adc64_event_generator.py b/src/proto_nd_flow/reco/light/adc64_event_generator.py index 73dcb747..805195ce 100644 --- a/src/proto_nd_flow/reco/light/adc64_event_generator.py +++ b/src/proto_nd_flow/reco/light/adc64_event_generator.py @@ -82,11 +82,11 @@ def event_dtype(self): return np.dtype([ ('id', 'u8'), # unique identifier ('event', 'i4'), # event number in data file ('sn', 'i4', (self.n_adcs,)), # adc serial number - ('ch', 'u1', (self.n_adcs, self.n_channels)), # channel number - ('utime_ms', 'u8', (self.n_adcs, self.n_channels)), # unix time [ms since epoch] - ('tai_ns', 'u8', (self.n_adcs, self.n_channels)), # time since PPS [ns] - ('wvfm_valid', 'u1', (self.n_adcs, self.n_channels)) # boolean, 1 if channel present in event - ]) + #('ch', 'u1', (self.n_adcs, self.n_channels)), # channel number + ('utime_ms', 'u8', (self.n_adcs,)), # unix time [ms since epoch] + ('tai_ns', 'u8', (self.n_adcs,)), # time since PPS [ns] + ('wvfm_valid', 'u1', (self.n_adcs, self.n_channels)) # boolean, 1 if channel present in event + ]) def wvfm_dtype(self): return np.dtype([ ('samples', 'i2', (self.n_adcs, self.n_channels, self.n_samples)) # sample value @@ -105,23 +105,24 @@ def __init__(self, **params): self.sn_table = params['sn_table'] - self.input_file = adc64format.ADC64Reader(self.input_filename,self.n_adcs) + self.input_file = adc64format.MPDReader(self.input_filename,self.n_adcs) self.input_file.open() # Read run info - _, self.nbytes_runinfo, self.runinfo =adc64format.parse_run_start(self.input_file.stream) + _, self.nbytes_runinfo, self.runinfo =adc64format.mpd_parse_run_start(self.input_file.stream) # use the first file for the event reference - _, self.chunk_size, test_event = adc64format.parse_chunk(self.input_file.stream) + _, self.chunk_size, test_event = adc64format.mpd_parse_chunk(self.input_file.stream) ndevices = len(test_event['data']) total_length_b = self.input_file.stream.seek(0, 2)-self.nbytes_runinfo self.input_file.reset() self.n_samples = test_event['data'][0].dtype['voltage'].shape[-1] + #Positions in #chunks/events (not bytes) self.end_position = total_length_b // self.chunk_size if self.end_position is None else min(self.end_position, total_length_b // self.chunk_size) - self.start_position = self.nbytes_runinfo if self.start_position is None else self.start_position + self.start_position = 0 if self.start_position is None else self.start_position self.curr_position = self.start_position # skip to start position - self.input_file.stream.seek(self.start_position, 0) + self.input_file.stream.seek(self.nbytes_runinfo, 0) def __len__(self): return (self.end_position - self.start_position) // (self.batch_size) @@ -166,9 +167,9 @@ def next(self): matched_events = None if self.rank == 0: # only read from single process - matched_events = [self.input_file.next(self.batch_size)] #FIXME: only works with batch_size=1 at the moment (to be fixed in adc64format) + matched_events = self.input_file.next(self.batch_size) #FIXME: only works with batch_size=1 at the moment (to be fixed in adc64format) # format events into output shape / structure - if matched_events[0] is not None: + if matched_events is not None: # create new event array / waveform array nevents = len(matched_events) event_arr = np.zeros(nevents, dtype=self.event_dtype) @@ -178,22 +179,32 @@ def next(self): data = np.array(events['data']) device = np.array(events['device']) time = np.array(events['time']) - event_arr[ievent]['event'] = event['event'] - for iadc, sn in enumerate(self.sn_table): - data_index = np.where(device["serial"] == sn)[0] - channels = data[data_index]['channel'] - event_arr[ievent]['sn'][iadc] = device[data_index]['serial'] - event_arr[ievent]['utime_ms'][iadc] = 0 - event_arr[ievent]['tai_ns'][iadc] = time[data_index]['tai_s']*1e9 + time[data_index]['tai_ns'] - event_arr[ievent]['wvfm_valid'][iadc, channels] = True - wvfm_arr[ievent]['samples'][iadc, channels] = data[data_index]['voltage'] + try: + event_arr[ievent]['event'] = event['event'] + for iadc, sn in enumerate(self.sn_table): + data_index = np.where(device["serial"] == sn)[0] + channels = data[data_index]['channel'] + event_arr[ievent]['sn'][iadc] = device[data_index]['serial'] + event_arr[ievent]['utime_ms'][iadc] = 0 + event_arr[ievent]['tai_ns'][iadc] = time[data_index]['tai_s']*1e9 + time[data_index]['tai_ns'] + event_arr[ievent]['wvfm_valid'][iadc, channels] = True + wvfm_arr[ievent]['samples'][iadc, channels] = data[data_index]['voltage'] + except: + continue # apply different clock frequency event_arr['tai_ns'] = (event_arr['tai_ns'] * self.clock_timestamp_factor).astype(event_arr.dtype['tai_ns'].base) + # mask off any totally empty events mask = np.any(event_arr['wvfm_valid'], axis=(-1,-2)) event_arr = event_arr[mask] wvfm_arr = wvfm_arr[mask] + + # mask off any extraneous events + if self.curr_position + len(event_arr) > self.end_position: + mask = self.curr_position + np.arange(len(event_arr)) < self.end_position + event_arr = event_arr[mask] + wvfm_arr = wvfm_arr[mask] self.curr_position += len(event_arr) else: event_arr = np.empty((0,), dtype=self.event_dtype) diff --git a/yamls/proto_nd_flow/reco/light/LightADC64EventGenerator.yaml b/yamls/proto_nd_flow/reco/light/LightADC64EventGenerator.yaml index 09cb95d9..3f5ed968 100644 --- a/yamls/proto_nd_flow/reco/light/LightADC64EventGenerator.yaml +++ b/yamls/proto_nd_flow/reco/light/LightADC64EventGenerator.yaml @@ -19,6 +19,6 @@ params: - 215564600 sync_channel: 0 sync_threshold: 40000 - batch_size: 1 #FIXME: only works with batch_size=1 at the moment (to be fixed in adc64format) + batch_size: 12 utime_ms_window: 1000 tai_ns_window: 1000 From f73cbed87ded923b77973754fa4e21ce7750f0de Mon Sep 17 00:00:00 2001 From: Angela White <123964490+AWh1t3@users.noreply.github.com> Date: Mon, 1 Apr 2024 14:52:35 -0500 Subject: [PATCH 2/8] LightADC64EventGenerator.yaml: re-ordered the adcs --- .../reco/light/LightADC64EventGenerator.yaml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/yamls/proto_nd_flow/reco/light/LightADC64EventGenerator.yaml b/yamls/proto_nd_flow/reco/light/LightADC64EventGenerator.yaml index 3f5ed968..3659b394 100644 --- a/yamls/proto_nd_flow/reco/light/LightADC64EventGenerator.yaml +++ b/yamls/proto_nd_flow/reco/light/LightADC64EventGenerator.yaml @@ -9,14 +9,14 @@ params: n_adcs: 8 n_channels: 64 sn_table: - - 642823167 - - 642874879 - - 215553019 - - 215563696 - - 215537224 - - 632779263 - - 642504959 - - 215564600 + - 215564600 # Mod0, Slot 3 + - 215537224 # Mod0, Slot 5 + - 642823167 # Mod1, Slot 7 + - 215553019 # Mod1, Slot 9 + - 215564617 # Mod2, Slot 11 + - 642874879 # Mod2, Slot 13 + - 642504959 # Mod3, Slot 15 + - 632779263 # Mod3, Slot 17 sync_channel: 0 sync_threshold: 40000 batch_size: 12 From b6bea7f433b4cc8a1f4d51c3d231fef7d28afe56 Mon Sep 17 00:00:00 2001 From: Livio Date: Tue, 30 Apr 2024 20:43:00 -0700 Subject: [PATCH 3/8] Add unix timestamps to light events --- env-nompi.yaml | 2 +- env.yaml | 2 +- src/proto_nd_flow/reco/light/adc64_event_generator.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/env-nompi.yaml b/env-nompi.yaml index a4a735ee..ca03ed94 100644 --- a/env-nompi.yaml +++ b/env-nompi.yaml @@ -10,5 +10,5 @@ dependencies: - pip - pip: - h5flow>=0.1.0 - - adc64format>=0.1.0 + - adc64format>=0.1.1 - pylandau diff --git a/env.yaml b/env.yaml index a4a735ee..ca03ed94 100644 --- a/env.yaml +++ b/env.yaml @@ -10,5 +10,5 @@ dependencies: - pip - pip: - h5flow>=0.1.0 - - adc64format>=0.1.0 + - adc64format>=0.1.1 - pylandau diff --git a/src/proto_nd_flow/reco/light/adc64_event_generator.py b/src/proto_nd_flow/reco/light/adc64_event_generator.py index 805195ce..9829a6c2 100644 --- a/src/proto_nd_flow/reco/light/adc64_event_generator.py +++ b/src/proto_nd_flow/reco/light/adc64_event_generator.py @@ -167,7 +167,7 @@ def next(self): matched_events = None if self.rank == 0: # only read from single process - matched_events = self.input_file.next(self.batch_size) #FIXME: only works with batch_size=1 at the moment (to be fixed in adc64format) + matched_events = self.input_file.next(self.batch_size) # format events into output shape / structure if matched_events is not None: # create new event array / waveform array @@ -185,7 +185,7 @@ def next(self): data_index = np.where(device["serial"] == sn)[0] channels = data[data_index]['channel'] event_arr[ievent]['sn'][iadc] = device[data_index]['serial'] - event_arr[ievent]['utime_ms'][iadc] = 0 + event_arr[ievent]['utime_ms'][iadc] = event['unix_ms'] event_arr[ievent]['tai_ns'][iadc] = time[data_index]['tai_s']*1e9 + time[data_index]['tai_ns'] event_arr[ievent]['wvfm_valid'][iadc, channels] = True wvfm_arr[ievent]['samples'][iadc, channels] = data[data_index]['voltage'] From b84ba56e46ba7c3ad042536a98994322ff1080f4 Mon Sep 17 00:00:00 2001 From: Livio Date: Tue, 30 Apr 2024 21:44:57 -0700 Subject: [PATCH 4/8] Enable loading ADC serial numbers from data file --- .../reco/light/adc64_event_generator.py | 23 ++++++++++++------- .../reco/light/LightADC64EventGenerator.yaml | 16 ++++++------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/src/proto_nd_flow/reco/light/adc64_event_generator.py b/src/proto_nd_flow/reco/light/adc64_event_generator.py index 9829a6c2..5e084595 100644 --- a/src/proto_nd_flow/reco/light/adc64_event_generator.py +++ b/src/proto_nd_flow/reco/light/adc64_event_generator.py @@ -17,7 +17,6 @@ class LightADC64EventGenerator(H5FlowGenerator): Parameters: - ``wvfm_dset_name`` : ``str``, required, path to dataset to store raw waveforms - - ``sn_table`` : ``list`` of ``int``, required, serial number of each ADC (determines order of the ADCs in the output data type) - ``n_adcs`` : ``int``, number of ADC serial numbers (default = 2) - ``n_channels`` : ``int``, number of channels per ADC (default = 64) - ``sync_channel`` : ``int``, channel index to use for identifying sync events (default = 32) @@ -25,6 +24,8 @@ class LightADC64EventGenerator(H5FlowGenerator): - ``sync_buffer`` : ``int``, optional, number of events to scan to find the first sync for each event builder process, only relevant if using MPI - ``clock_timestamp_factor`` : ``float``, tick size for ``tai_ns`` in raw data [ns] (default = 0.625) - ``batch_size`` : ``int``, optional, number of events to buffer before initiating next loop iteration (default = 128) + - ``sn_table`` : ``list`` of ``int``, optional, serial number of each ADC (determines order of the ADCs in the output data type, + if not give order like in data file) Generates a lightweight "event" dataset along with a dataset containing @@ -103,7 +104,6 @@ def __init__(self, **params): self.wvfm_dset_name = params['wvfm_dset_name'] self.event_dset_name = self.dset_name - self.sn_table = params['sn_table'] self.input_file = adc64format.MPDReader(self.input_filename,self.n_adcs) self.input_file.open() @@ -116,6 +116,11 @@ def __init__(self, **params): self.input_file.reset() self.n_samples = test_event['data'][0].dtype['voltage'].shape[-1] + if 'sn_table' in params: + self.sn_table = [int(value,16) for value in params['sn_table']] + else: + self.sn_table = [device["serial"].item() for device in test_event["device"]] + #Positions in #chunks/events (not bytes) self.end_position = total_length_b // self.chunk_size if self.end_position is None else min(self.end_position, total_length_b // self.chunk_size) self.start_position = 0 if self.start_position is None else self.start_position @@ -175,22 +180,24 @@ def next(self): event_arr = np.zeros(nevents, dtype=self.event_dtype) wvfm_arr = np.zeros(nevents, dtype=self.wvfm_dtype) for ievent,events in enumerate(matched_events): + if not events: + continue event = events['event'] data = np.array(events['data']) device = np.array(events['device']) time = np.array(events['time']) - try: - event_arr[ievent]['event'] = event['event'] - for iadc, sn in enumerate(self.sn_table): - data_index = np.where(device["serial"] == sn)[0] + event_arr[ievent]['event'] = event['event'] + for iadc, sn in enumerate(self.sn_table): + data_index = np.where(device["serial"] == sn)[0] + if len(data_index): channels = data[data_index]['channel'] event_arr[ievent]['sn'][iadc] = device[data_index]['serial'] event_arr[ievent]['utime_ms'][iadc] = event['unix_ms'] event_arr[ievent]['tai_ns'][iadc] = time[data_index]['tai_s']*1e9 + time[data_index]['tai_ns'] event_arr[ievent]['wvfm_valid'][iadc, channels] = True wvfm_arr[ievent]['samples'][iadc, channels] = data[data_index]['voltage'] - except: - continue + else: + print("ADC", hex(sn)," not found") # apply different clock frequency event_arr['tai_ns'] = (event_arr['tai_ns'] * self.clock_timestamp_factor).astype(event_arr.dtype['tai_ns'].base) diff --git a/yamls/proto_nd_flow/reco/light/LightADC64EventGenerator.yaml b/yamls/proto_nd_flow/reco/light/LightADC64EventGenerator.yaml index 3659b394..8f7d60aa 100644 --- a/yamls/proto_nd_flow/reco/light/LightADC64EventGenerator.yaml +++ b/yamls/proto_nd_flow/reco/light/LightADC64EventGenerator.yaml @@ -9,14 +9,14 @@ params: n_adcs: 8 n_channels: 64 sn_table: - - 215564600 # Mod0, Slot 3 - - 215537224 # Mod0, Slot 5 - - 642823167 # Mod1, Slot 7 - - 215553019 # Mod1, Slot 9 - - 215564617 # Mod2, Slot 11 - - 642874879 # Mod2, Slot 13 - - 642504959 # Mod3, Slot 15 - - 632779263 # Mod3, Slot 17 + - '0xcd94138' # Mod0, Slot 3 + - '0xcd8d648' # Mod0, Slot 5 + - '0x2650b3ff' # Mod1, Slot 7 + - '0xcd913fb' # Mod1, Slot 9 + - '0xcd94149' # Mod2, Slot 11 + - '0x26517dff' # Mod2, Slot 13 + - '0x264bd8ff' # Mod3, Slot 15 + - '0x25b771ff' # Mod3, Slot 17 sync_channel: 0 sync_threshold: 40000 batch_size: 12 From c416921f26135f44c2262c96371e7e15a79383d1 Mon Sep 17 00:00:00 2001 From: Livio Date: Sun, 12 May 2024 15:06:07 -0700 Subject: [PATCH 5/8] Add renamed light data event builder stages (for 2x2) --- .../reco/light/mpd_event_generator.py | 242 ++++++++++++++++++ .../reco/light/LightMPDEventGenerator.yaml | 24 ++ .../light/light_event_building_mpd.yaml | 9 + 3 files changed, 275 insertions(+) create mode 100644 src/proto_nd_flow/reco/light/mpd_event_generator.py create mode 100644 yamls/proto_nd_flow/reco/light/LightMPDEventGenerator.yaml create mode 100644 yamls/proto_nd_flow/workflows/light/light_event_building_mpd.yaml diff --git a/src/proto_nd_flow/reco/light/mpd_event_generator.py b/src/proto_nd_flow/reco/light/mpd_event_generator.py new file mode 100644 index 00000000..ef5e5425 --- /dev/null +++ b/src/proto_nd_flow/reco/light/mpd_event_generator.py @@ -0,0 +1,242 @@ +import numpy as np +import h5py +import os +import logging +from math import ceil + +from h5flow.core import H5FlowGenerator +from h5flow import H5FLOW_MPI + +import adc64format + + +class LightMPDEventGenerator(H5FlowGenerator): + ''' + Light system event builder - converts multiple ADC64-formatted light files to an event-packed + h5flow-readable format. Uses the ``adc64format`` library. + + Parameters: + - ``wvfm_dset_name`` : ``str``, required, path to dataset to store raw waveforms + - ``n_adcs`` : ``int``, number of ADC serial numbers (default = 2) + - ``n_channels`` : ``int``, number of channels per ADC (default = 64) + - ``sync_channel`` : ``int``, channel index to use for identifying sync events (default = 32) + - ``sync_threshold`` : ``int``, threshold for identifying sync events (default = 5000) [ADC] + - ``sync_buffer`` : ``int``, optional, number of events to scan to find the first sync for each event builder process, only relevant if using MPI + - ``clock_timestamp_factor`` : ``float``, tick size for ``tai_ns`` in raw data [ns] (default = 0.625) + - ``batch_size`` : ``int``, optional, number of events to buffer before initiating next loop iteration (default = 128) + - ``sn_table`` : ``list`` of ``int``, optional, serial number of each ADC (determines order of the ADCs in the output data type, + if not give order like in data file) + + + Generates a lightweight "event" dataset along with a dataset containing + event-packed raw waveforms. + + Example config:: + + flow: + source: light_event_generator + stages: [] + + light_event_generator: + classname: LightADC64EventGenerator + path: module0_flow.reco.light.hdf5_event_generator + dset_name: 'light/events' + params: + wvfm_dset_name: 'light/wvfm' + n_adcs: 2 + n_channels: 64 + sync_channel: 32 + sn_table: + - 175780172 + - 175854781 + batch_size: 128 + utime_ms_window: 1000 + tai_ns_window: 1000 + + ``events`` datatype:: + + id u8, unique identifier per event + event i4, event number from source ROOT file + sn i4(n_adcs,), serial number of adc + ch u1(n_adcs,n_channels), channel id + utime_ms u8(n_adcs,n_channels), unix time since epoch [ms] + tai_ns u8(n_adcs,n_channels), time since PPS [ns] + wvfm_valid u1(n_adcs,n_channels), boolean indicator if channel is present in event + + ``wvfm`` datatype:: + + samples i2(n_adc,n_channels,n_samples), sample 10-bit ADC value (lowest 6 bits are not used) + ''' + defaults = dict( + n_adcs = 8, + n_channels = 64, + batch_size = 64, + sync_channel = 0, + sync_threshold = 40000, + sync_buffer = 200, + clock_timestamp_factor = 1.0, + utime_ms_window = 1000, + tai_ns_window = 1000, + ) + + def event_dtype(self): return np.dtype([ + ('id', 'u8'), # unique identifier + ('event', 'i4'), # event number in data file + ('sn', 'i4', (self.n_adcs,)), # adc serial number + #('ch', 'u1', (self.n_adcs, self.n_channels)), # channel number + ('utime_ms', 'u8', (self.n_adcs,)), # unix time [ms since epoch] + ('tai_ns', 'u8', (self.n_adcs,)), # time since PPS [ns] + ('wvfm_valid', 'u1', (self.n_adcs, self.n_channels)) # boolean, 1 if channel present in event + ]) + + def wvfm_dtype(self): return np.dtype([ + ('samples', 'i2', (self.n_adcs, self.n_channels, self.n_samples)) # sample value + ]) + + def __init__(self, **params): + super(LightADC64EventGenerator, self).__init__(**params) + + # set up parameters + for key,val in self.defaults.items(): + setattr(self, key, params.get(key,val)) + self.n_samples = 0 + + self.wvfm_dset_name = params['wvfm_dset_name'] + self.event_dset_name = self.dset_name + + + self.input_file = adc64format.MPDReader(self.input_filename,self.n_adcs) + self.input_file.open() + # Read run info + _, self.nbytes_runinfo, self.runinfo =adc64format.mpd_parse_run_start(self.input_file.stream) + # use the first file for the event reference + _, self.chunk_size, test_event = adc64format.mpd_parse_chunk(self.input_file.stream) + ndevices = len(test_event['data']) + total_length_b = self.input_file.stream.seek(0, 2)-self.nbytes_runinfo + self.input_file.reset() + self.n_samples = test_event['data'][0].dtype['voltage'].shape[-1] + + if 'sn_table' in params: + self.sn_table = [int(value,16) for value in params['sn_table']] + else: + self.sn_table = [device["serial"].item() for device in test_event["device"]] + + #Positions in #chunks/events (not bytes) + self.end_position = total_length_b // self.chunk_size if self.end_position is None else min(self.end_position, total_length_b // self.chunk_size) + self.start_position = 0 if self.start_position is None else self.start_position + self.curr_position = self.start_position + + # skip to start position + self.input_file.stream.seek(self.nbytes_runinfo, 0) + + def __len__(self): + return (self.end_position - self.start_position) // (self.batch_size) + + def finish(self): + self.input_file.close() + super(LightADC64EventGenerator, self).finish() + + def init(self): + super(LightADC64EventGenerator, self).init() + + # fix dataset dtypes + self.event_dtype = self.event_dtype() + self.wvfm_dtype = self.wvfm_dtype() + + # initialize data objects + self.data_manager.create_dset(self.event_dset_name, dtype=self.event_dtype) + self.data_manager.create_dset(self.wvfm_dset_name, dtype=self.wvfm_dtype) + self.data_manager.create_ref(self.event_dset_name, self.wvfm_dset_name) + params = dict([(key,getattr(self,key)) for key in self.defaults]) + params['sn_table'] = self.sn_table + params['n_samples'] = self.n_samples + self.data_manager.set_attrs(self.event_dset_name, + classname=self.classname, + class_version=self.class_version, + start_position=self.start_position, + end_position=self.end_position, + input_filename=self.input_filename, + wvfm_dset_name=self.wvfm_dset_name, + **params + ) + + def finish(self): + super(LightADC64EventGenerator, self).finish() + self.input_file.close() + + @staticmethod + def valid_array(arr): + return arr is not None + + def next(self): + matched_events = None + if self.rank == 0: + # only read from single process + matched_events = self.input_file.next(self.batch_size) + # format events into output shape / structure + if matched_events is not None: + # create new event array / waveform array + nevents = len(matched_events) + event_arr = np.zeros(nevents, dtype=self.event_dtype) + wvfm_arr = np.zeros(nevents, dtype=self.wvfm_dtype) + for ievent,events in enumerate(matched_events): + if not events: + continue + event = events['event'] + data = np.array(events['data']) + device = np.array(events['device']) + time = np.array(events['time']) + event_arr[ievent]['event'] = event['event'] + for iadc, sn in enumerate(self.sn_table): + data_index = np.where(device["serial"] == sn)[0] + if len(data_index): + channels = data[data_index]['channel'] + event_arr[ievent]['sn'][iadc] = device[data_index]['serial'] + event_arr[ievent]['utime_ms'][iadc] = event['unix_ms'] + event_arr[ievent]['tai_ns'][iadc] = time[data_index]['tai_s']*1e9 + time[data_index]['tai_ns'] + event_arr[ievent]['wvfm_valid'][iadc, channels] = True + wvfm_arr[ievent]['samples'][iadc, channels] = data[data_index]['voltage'] + else: + print("ADC", hex(sn)," not found") + + # apply different clock frequency + event_arr['tai_ns'] = (event_arr['tai_ns'] * self.clock_timestamp_factor).astype(event_arr.dtype['tai_ns'].base) + + # mask off any totally empty events + mask = np.any(event_arr['wvfm_valid'], axis=(-1,-2)) + event_arr = event_arr[mask] + wvfm_arr = wvfm_arr[mask] + + # mask off any extraneous events + if self.curr_position + len(event_arr) > self.end_position: + mask = self.curr_position + np.arange(len(event_arr)) < self.end_position + event_arr = event_arr[mask] + wvfm_arr = wvfm_arr[mask] + self.curr_position += len(event_arr) + else: + event_arr = np.empty((0,), dtype=self.event_dtype) + wvfm_arr = np.empty((0,), dtype=self.wvfm_dtype) + # write event to file + event_slice = self.data_manager.reserve_data(self.event_dset_name, len(event_arr)) + event_arr['id'] = np.arange(event_slice.start, event_slice.stop) + self.data_manager.write_data(self.event_dset_name, event_slice, event_arr) + + self.data_manager.reserve_data(self.wvfm_dset_name, event_slice) + self.data_manager.write_data(self.wvfm_dset_name, event_slice, wvfm_arr) + + # set up references + # just event -> wvfm 1:1 refs + ref = np.c_[event_arr['id'], event_arr['id']] + self.data_manager.write_ref(self.event_dset_name, self.wvfm_dset_name, ref) + + # if using MPI, divy up data across processes + if H5FLOW_MPI: + event_slice = self.comm.bcast(event_slice) + n = ceil((event_slice.stop - event_slice.start) / self.size) + start = event_slice.start + self.rank * n + stop = min(event_slice.stop, event_slice.start + (self.rank+1) * n) + event_slice = slice(start, stop) + + if len(event_arr): + return event_slice + return H5FlowGenerator.EMPTY diff --git a/yamls/proto_nd_flow/reco/light/LightMPDEventGenerator.yaml b/yamls/proto_nd_flow/reco/light/LightMPDEventGenerator.yaml new file mode 100644 index 00000000..b9523693 --- /dev/null +++ b/yamls/proto_nd_flow/reco/light/LightMPDEventGenerator.yaml @@ -0,0 +1,24 @@ +classname: LightMPDEventGenerator +path: proto_nd_flow.reco.light.mpd_event_generator +dset_name: 'light/events' +params: + # secondary output + wvfm_dset_name: 'light/wvfm' + + # configuration parameters + n_adcs: 8 + n_channels: 64 + sn_table: + - '0xcd94138' # Mod0, Slot 3 + - '0xcd8d648' # Mod0, Slot 5 + - '0x2650b3ff' # Mod1, Slot 7 + - '0xcd913fb' # Mod1, Slot 9 + - '0xcd94149' # Mod2, Slot 11 + - '0x26517dff' # Mod2, Slot 13 + - '0x264bd8ff' # Mod3, Slot 15 + - '0x25b771ff' # Mod3, Slot 17 + sync_channel: 0 + sync_threshold: 40000 + batch_size: 12 + utime_ms_window: 1000 + tai_ns_window: 1000 diff --git a/yamls/proto_nd_flow/workflows/light/light_event_building_mpd.yaml b/yamls/proto_nd_flow/workflows/light/light_event_building_mpd.yaml new file mode 100644 index 00000000..f1e8b978 --- /dev/null +++ b/yamls/proto_nd_flow/workflows/light/light_event_building_mpd.yaml @@ -0,0 +1,9 @@ +# Generates the low-level event built data for light data (i.e. grouped raw +# waveforms) + +flow: + source: light_event_generator + +light_event_generator: + !include yamls/proto_nd_flow/reco/light/LightMPDEventGenerator.yaml + From ff34bfe6deda3b382ef5829ef0ec91eac0f7f3d2 Mon Sep 17 00:00:00 2001 From: Livio Date: Sun, 12 May 2024 15:19:25 -0700 Subject: [PATCH 6/8] Revert previous commits to bring adc64_event_generator back to original form --- .../reco/light/adc64_event_generator.py | 58 +++++++------------ .../reco/light/LightADC64EventGenerator.yaml | 18 +++--- 2 files changed, 29 insertions(+), 47 deletions(-) diff --git a/src/proto_nd_flow/reco/light/adc64_event_generator.py b/src/proto_nd_flow/reco/light/adc64_event_generator.py index 5e084595..73dcb747 100644 --- a/src/proto_nd_flow/reco/light/adc64_event_generator.py +++ b/src/proto_nd_flow/reco/light/adc64_event_generator.py @@ -17,6 +17,7 @@ class LightADC64EventGenerator(H5FlowGenerator): Parameters: - ``wvfm_dset_name`` : ``str``, required, path to dataset to store raw waveforms + - ``sn_table`` : ``list`` of ``int``, required, serial number of each ADC (determines order of the ADCs in the output data type) - ``n_adcs`` : ``int``, number of ADC serial numbers (default = 2) - ``n_channels`` : ``int``, number of channels per ADC (default = 64) - ``sync_channel`` : ``int``, channel index to use for identifying sync events (default = 32) @@ -24,8 +25,6 @@ class LightADC64EventGenerator(H5FlowGenerator): - ``sync_buffer`` : ``int``, optional, number of events to scan to find the first sync for each event builder process, only relevant if using MPI - ``clock_timestamp_factor`` : ``float``, tick size for ``tai_ns`` in raw data [ns] (default = 0.625) - ``batch_size`` : ``int``, optional, number of events to buffer before initiating next loop iteration (default = 128) - - ``sn_table`` : ``list`` of ``int``, optional, serial number of each ADC (determines order of the ADCs in the output data type, - if not give order like in data file) Generates a lightweight "event" dataset along with a dataset containing @@ -83,11 +82,11 @@ def event_dtype(self): return np.dtype([ ('id', 'u8'), # unique identifier ('event', 'i4'), # event number in data file ('sn', 'i4', (self.n_adcs,)), # adc serial number - #('ch', 'u1', (self.n_adcs, self.n_channels)), # channel number - ('utime_ms', 'u8', (self.n_adcs,)), # unix time [ms since epoch] - ('tai_ns', 'u8', (self.n_adcs,)), # time since PPS [ns] - ('wvfm_valid', 'u1', (self.n_adcs, self.n_channels)) # boolean, 1 if channel present in event - ]) + ('ch', 'u1', (self.n_adcs, self.n_channels)), # channel number + ('utime_ms', 'u8', (self.n_adcs, self.n_channels)), # unix time [ms since epoch] + ('tai_ns', 'u8', (self.n_adcs, self.n_channels)), # time since PPS [ns] + ('wvfm_valid', 'u1', (self.n_adcs, self.n_channels)) # boolean, 1 if channel present in event + ]) def wvfm_dtype(self): return np.dtype([ ('samples', 'i2', (self.n_adcs, self.n_channels, self.n_samples)) # sample value @@ -104,30 +103,25 @@ def __init__(self, **params): self.wvfm_dset_name = params['wvfm_dset_name'] self.event_dset_name = self.dset_name + self.sn_table = params['sn_table'] - self.input_file = adc64format.MPDReader(self.input_filename,self.n_adcs) + self.input_file = adc64format.ADC64Reader(self.input_filename,self.n_adcs) self.input_file.open() # Read run info - _, self.nbytes_runinfo, self.runinfo =adc64format.mpd_parse_run_start(self.input_file.stream) + _, self.nbytes_runinfo, self.runinfo =adc64format.parse_run_start(self.input_file.stream) # use the first file for the event reference - _, self.chunk_size, test_event = adc64format.mpd_parse_chunk(self.input_file.stream) + _, self.chunk_size, test_event = adc64format.parse_chunk(self.input_file.stream) ndevices = len(test_event['data']) total_length_b = self.input_file.stream.seek(0, 2)-self.nbytes_runinfo self.input_file.reset() self.n_samples = test_event['data'][0].dtype['voltage'].shape[-1] - if 'sn_table' in params: - self.sn_table = [int(value,16) for value in params['sn_table']] - else: - self.sn_table = [device["serial"].item() for device in test_event["device"]] - - #Positions in #chunks/events (not bytes) self.end_position = total_length_b // self.chunk_size if self.end_position is None else min(self.end_position, total_length_b // self.chunk_size) - self.start_position = 0 if self.start_position is None else self.start_position + self.start_position = self.nbytes_runinfo if self.start_position is None else self.start_position self.curr_position = self.start_position # skip to start position - self.input_file.stream.seek(self.nbytes_runinfo, 0) + self.input_file.stream.seek(self.start_position, 0) def __len__(self): return (self.end_position - self.start_position) // (self.batch_size) @@ -172,16 +166,14 @@ def next(self): matched_events = None if self.rank == 0: # only read from single process - matched_events = self.input_file.next(self.batch_size) + matched_events = [self.input_file.next(self.batch_size)] #FIXME: only works with batch_size=1 at the moment (to be fixed in adc64format) # format events into output shape / structure - if matched_events is not None: + if matched_events[0] is not None: # create new event array / waveform array nevents = len(matched_events) event_arr = np.zeros(nevents, dtype=self.event_dtype) wvfm_arr = np.zeros(nevents, dtype=self.wvfm_dtype) for ievent,events in enumerate(matched_events): - if not events: - continue event = events['event'] data = np.array(events['data']) device = np.array(events['device']) @@ -189,29 +181,19 @@ def next(self): event_arr[ievent]['event'] = event['event'] for iadc, sn in enumerate(self.sn_table): data_index = np.where(device["serial"] == sn)[0] - if len(data_index): - channels = data[data_index]['channel'] - event_arr[ievent]['sn'][iadc] = device[data_index]['serial'] - event_arr[ievent]['utime_ms'][iadc] = event['unix_ms'] - event_arr[ievent]['tai_ns'][iadc] = time[data_index]['tai_s']*1e9 + time[data_index]['tai_ns'] - event_arr[ievent]['wvfm_valid'][iadc, channels] = True - wvfm_arr[ievent]['samples'][iadc, channels] = data[data_index]['voltage'] - else: - print("ADC", hex(sn)," not found") + channels = data[data_index]['channel'] + event_arr[ievent]['sn'][iadc] = device[data_index]['serial'] + event_arr[ievent]['utime_ms'][iadc] = 0 + event_arr[ievent]['tai_ns'][iadc] = time[data_index]['tai_s']*1e9 + time[data_index]['tai_ns'] + event_arr[ievent]['wvfm_valid'][iadc, channels] = True + wvfm_arr[ievent]['samples'][iadc, channels] = data[data_index]['voltage'] # apply different clock frequency event_arr['tai_ns'] = (event_arr['tai_ns'] * self.clock_timestamp_factor).astype(event_arr.dtype['tai_ns'].base) - # mask off any totally empty events mask = np.any(event_arr['wvfm_valid'], axis=(-1,-2)) event_arr = event_arr[mask] wvfm_arr = wvfm_arr[mask] - - # mask off any extraneous events - if self.curr_position + len(event_arr) > self.end_position: - mask = self.curr_position + np.arange(len(event_arr)) < self.end_position - event_arr = event_arr[mask] - wvfm_arr = wvfm_arr[mask] self.curr_position += len(event_arr) else: event_arr = np.empty((0,), dtype=self.event_dtype) diff --git a/yamls/proto_nd_flow/reco/light/LightADC64EventGenerator.yaml b/yamls/proto_nd_flow/reco/light/LightADC64EventGenerator.yaml index 8f7d60aa..09cb95d9 100644 --- a/yamls/proto_nd_flow/reco/light/LightADC64EventGenerator.yaml +++ b/yamls/proto_nd_flow/reco/light/LightADC64EventGenerator.yaml @@ -9,16 +9,16 @@ params: n_adcs: 8 n_channels: 64 sn_table: - - '0xcd94138' # Mod0, Slot 3 - - '0xcd8d648' # Mod0, Slot 5 - - '0x2650b3ff' # Mod1, Slot 7 - - '0xcd913fb' # Mod1, Slot 9 - - '0xcd94149' # Mod2, Slot 11 - - '0x26517dff' # Mod2, Slot 13 - - '0x264bd8ff' # Mod3, Slot 15 - - '0x25b771ff' # Mod3, Slot 17 + - 642823167 + - 642874879 + - 215553019 + - 215563696 + - 215537224 + - 632779263 + - 642504959 + - 215564600 sync_channel: 0 sync_threshold: 40000 - batch_size: 12 + batch_size: 1 #FIXME: only works with batch_size=1 at the moment (to be fixed in adc64format) utime_ms_window: 1000 tai_ns_window: 1000 From dbf5d16e7a74ae81a9c62b3c3f12d7f852a31b58 Mon Sep 17 00:00:00 2001 From: Livio Date: Sun, 12 May 2024 15:30:04 -0700 Subject: [PATCH 7/8] Update light data run script --- scripts/proto_nd_scripts/run_proto_nd_light_flow_data.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/proto_nd_scripts/run_proto_nd_light_flow_data.sh b/scripts/proto_nd_scripts/run_proto_nd_light_flow_data.sh index 43889f96..d1f92a3e 100644 --- a/scripts/proto_nd_scripts/run_proto_nd_light_flow_data.sh +++ b/scripts/proto_nd_scripts/run_proto_nd_light_flow_data.sh @@ -20,7 +20,7 @@ H5FLOW_CMD='h5flow' #H5FLOW_CMD='srun -n32 h5flow' # run all stages -WORKFLOW1='yamls/proto_nd_flow/workflows/light/light_event_building_adc64.yaml' +WORKFLOW1='yamls/proto_nd_flow/workflows/light/light_event_building_mpd.yaml' HERE=`pwd` #cd ndlar_flow From bacedc7d214feabac5c37c06f5d52e2ee4bc1c33 Mon Sep 17 00:00:00 2001 From: Livio Date: Sun, 12 May 2024 15:36:44 -0700 Subject: [PATCH 8/8] Minor bug fixes in mpd_event_generator --- scripts/proto_nd_scripts/run_proto_nd_light_flow_data.sh | 2 +- src/proto_nd_flow/reco/light/mpd_event_generator.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/scripts/proto_nd_scripts/run_proto_nd_light_flow_data.sh b/scripts/proto_nd_scripts/run_proto_nd_light_flow_data.sh index d1f92a3e..92a745de 100644 --- a/scripts/proto_nd_scripts/run_proto_nd_light_flow_data.sh +++ b/scripts/proto_nd_scripts/run_proto_nd_light_flow_data.sh @@ -11,7 +11,7 @@ OUTPUT_DIR=`pwd` #!!! change me OUTPUT_NAME=(${INPUT_FILE//"/"/ }) OUTPUT_NAME=${OUTPUT_NAME[-1]} OUTPUT_FILE="${OUTPUT_DIR}/${OUTPUT_NAME}" -OUTPUT_FILE=${OUTPUT_FILE//.h5/.proto_nd_flow.h5} +OUTPUT_FILE=${OUTPUT_FILE//.data/.proto_nd_flow.h5} echo ${OUTPUT_FILE} # for running on a login node diff --git a/src/proto_nd_flow/reco/light/mpd_event_generator.py b/src/proto_nd_flow/reco/light/mpd_event_generator.py index ef5e5425..6a6e7d26 100644 --- a/src/proto_nd_flow/reco/light/mpd_event_generator.py +++ b/src/proto_nd_flow/reco/light/mpd_event_generator.py @@ -94,7 +94,7 @@ def wvfm_dtype(self): return np.dtype([ ]) def __init__(self, **params): - super(LightADC64EventGenerator, self).__init__(**params) + super(LightMPDEventGenerator, self).__init__(**params) # set up parameters for key,val in self.defaults.items(): @@ -134,10 +134,10 @@ def __len__(self): def finish(self): self.input_file.close() - super(LightADC64EventGenerator, self).finish() + super(LightMPDEventGenerator, self).finish() def init(self): - super(LightADC64EventGenerator, self).init() + super(LightMPDEventGenerator, self).init() # fix dataset dtypes self.event_dtype = self.event_dtype() @@ -161,7 +161,7 @@ def init(self): ) def finish(self): - super(LightADC64EventGenerator, self).finish() + super(LightMPDEventGenerator, self).finish() self.input_file.close() @staticmethod