Skip to content

Commit

Permalink
convert exchange checker to use web workers
Browse files Browse the repository at this point in the history
  • Loading branch information
DocOtak committed Dec 14, 2024
1 parent b9009ad commit b948f9c
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 129 deletions.
202 changes: 74 additions & 128 deletions docs/_exchange_checker_include.html
Original file line number Diff line number Diff line change
@@ -1,151 +1,102 @@
<link rel="stylesheet" href="https://pyscript.net/releases/2024.11.1/core.css">
<script type="module" src="https://pyscript.net/releases/2024.11.1/core.js"></script>
<style>
#log-container{
max-height: 50vh;
overflow: scroll;
display: flex;
flex-direction: column-reverse;
}
</style>

<p>
cchdo.hydro version: <span id="hydro_version"></span><br />
cchdo.params version: <span id="params_version"></span>
</p>
<p>
<label>Add an exchange file (csv or zip) <input type="file" id="ex_file" name="ex_file"></label>
<label>Add an exchange file (csv or zip) <input type="file" id="ex_file" name="ex_file" disabled></label>
<h4>Options</h4>
<label><input id="checks_flags" type="checkbox" checked> Check Flags</label>
<p>
<button class="sd-sphinx-override sd-btn sd-text-wrap sd-btn-primary reference internal" id="process_exchange" py-click="_process_exchange">Process Exchange</button>
<button class="sd-sphinx-override sd-btn sd-text-wrap sd-btn-primary reference internal" id="process_exchange" py-click="_process_exchange" enabled="False">Process Exchange</button>
</p>
<p>
<div id='output'>
<span id="status"></span>
<span id="status">Loading Python Runtime</span>
<br />
</div>
<script>
addEventListener('py:ready', () => console.log("done"));
</script>
<h4>Log Console</h4>
<div>
<div id="log-container">
<code id='log'>
</code>
</div>
</div>
<script type="py" src="./_static/convert_exchange.py" service-worker="./_static/sw.js" worker name="my-worker" config='{"packages": ["xarray", "cchdo.hydro<=1.0.2.8", "netcdf4"]}'></script>
<script type="py" src="./_static/derrived_makers.py" service-worker="./_static/sw.js" worker name="derived1" config='{"packages": ["xarray", "cchdo.hydro<=1.0.2.8", "netcdf4" ]}'></script>
<script type="py" src="./_static/derrived_makers.py" service-worker="./_static/sw.js" worker name="derived2" config='{"packages": ["xarray", "cchdo.hydro<=1.0.2.8", "netcdf4" ]}'></script>
<script type="py" src="./_static/derrived_makers.py" service-worker="./_static/sw.js" worker name="derived3" config='{"packages": ["xarray", "cchdo.hydro<=1.0.2.8", "netcdf4" ]}'></script>
<script type="py">
from pyscript import workers, display
from js import document, console, window, Uint8Array, Blob
import asyncio
import io
import traceback

from cchdo.hydro import read_exchange
from cchdo.hydro import accessors

from cchdo.hydro import __version__ as hydro_version
from cchdo.params import __version__ as params_version

document.querySelector("#hydro_version").innerText = hydro_version
document.querySelector("#params_version").innerText = params_version

import logging
import sys

root = logging.getLogger()
root.setLevel(logging.DEBUG)

handler = logging.StreamHandler(sys.stderr)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
root.addHandler(handler)
import json

def logger(msg):
display(msg, target="log", append=True)

logger("Waiting for Python to be ready")
my_worker = await workers["my-worker"]
my_worker.sync.logger = logger
logger("Python ready")
status = document.querySelector("#status")

versions = await my_worker.versions()
document.querySelector("#hydro_version").innerText = versions.hydro_version
document.querySelector("#params_version").innerText = versions.params_version

document.querySelector("#process_exchange").disabled = False
document.querySelector("#ex_file").disabled = False
status.innerText = "Ready..."

def _handle_success(result):
nc_bytes, nc_fname = result
nc_blob = Blob.new([Uint8Array.new(nc_bytes)], {type : 'application/octet-stream'})
nc_url = window.URL.createObjectURL(nc_blob)
nc_download_link = document.createElement("a")
nc_download_link.href = nc_url
nc_download_link.download = nc_fname
nc_download_link.innerText = f"Downlaod {nc_fname}"
output = document.querySelector("#output")
output.appendChild(nc_download_link)
output.appendChild(document.createElement("br"))
document.querySelector("#process_exchange").disabled = False

async def load_ex_bytes(ex_bytes, checks):
return read_exchange(ex_bytes, checks=checks)
status = document.querySelector("#status")
status.innerText = f""
return nc_bytes

async def to_netcdf(ex):
ex.to_netcdf("out.nc")
with open("out.nc", "rb") as f:
return f.read()
async def _make_derived(nc_bytes):
worker1 = await workers["derived1"]
worker2 = await workers["derived2"]
worker3 = await workers["derived3"]

async def to_coards(ex):
return ex.cchdo.to_coards()
worker1.make_derived(nc_bytes, "to_sum").then(_handle_success)
worker2.make_derived(nc_bytes, "to_woce").then(_handle_success)
# TODO make this not need scipy
#worker3.make_derived(nc_bytes, "to_coards").then(_handle_success)

async def to_woce(ex):
return ex.cchdo.to_woce()

async def to_xarray_callback(arg):
bytes = bytearray(Uint8Array.new(arg))
ex_bytes = io.BytesIO(bytes)
def _handle_submit(arr_buffer):
status = document.querySelector("#status")
check_flags = document.querySelector("#checks_flags").checked
checks = {
"flags": check_flags
}
try:
ex = await load_ex_bytes(ex_bytes, checks=checks)
except ValueError as er:
traceback.print_exception(er)
status.innerText = f"Failure see traceback..."
document.querySelector("#process_exchange").disabled = False
return
status.innerText = f"Success, generating files"
await asyncio.sleep(0.1)
try:
nc = await to_netcdf(ex)
nc_blob = Blob.new([Uint8Array.new(nc)], {type : 'application/netcdf'})
nc_url = window.URL.createObjectURL(nc_blob)
nc_download_link = document.createElement("a")
nc_download_link.href = nc_url
nc_fname = ex.cchdo.gen_fname()
nc_download_link.download = nc_fname
nc_download_link.innerText = f"Download CF/netCDF: {nc_fname}"
output = document.querySelector("#output")
output.appendChild(nc_download_link)
output.appendChild(document.createElement("br"))

except Exception as er:
status.innerText = f"Could not generate CF/netCDF"

await asyncio.sleep(0.1)
#status.element.innerText = f"Generating COARDS netCDF (very slow)"
#await asyncio.sleep(0.1)
#try:
# coards = await to_coards(ex)
# coards_blob = Blob.new([Uint8Array.new(coards)], {type : 'application/octet-stream'})
# coards_url = window.URL.createObjectURL(coards_blob)
# coards_download_link = document.createElement("a")
# coards_download_link.href = coards_url
# coards_download_link.download = "coards_nc.zip"
# coards_download_link.innerText = "Download COARDS netcdf zip"
# output = Element("output")
# output.element.appendChild(coards_download_link)
# output.element.appendChild(document.createElement("br"))
#except Exception as ex:
# print(ex)
# status.element.innerText = f"Could not generate COARDS netCDF"

await asyncio.sleep(0.1)
status.innerText = f"Generating WOCE Files"
await asyncio.sleep(0.1)
try:
woce = await to_woce(ex)
woce_blob = Blob.new([Uint8Array.new(woce)], {type : 'application/octet-stream'})
woce_url = window.URL.createObjectURL(woce_blob)
woce_download_link = document.createElement("a")
woce_download_link.href = woce_url
woce_download_link.download = "woce_output.txt"
woce_download_link.innerText = "Download Woce (might be txt or zip)"
output = document.querySelector("#output")
output.appendChild(woce_download_link)
output.appendChild(document.createElement("br"))
except:
status.innerText = f"Could not generate WOCE"

await asyncio.sleep(0.1)
try:
summary = ex.cchdo.to_sum()
summary_blob = Blob.new([Uint8Array.new(summary)], {type : 'application/octet-stream'})
summary_url = window.URL.createObjectURL(summary_blob)
summary_download_link = document.createElement("a")
summary_download_link.href = summary_url
summary_download_link.download = "woce_sum.txt"
summary_download_link.innerText = "Download Woce Sumfile"
output = document.querySelector("#output")
output.appendChild(summary_download_link)
output.appendChild(document.createElement("br"))
except:
status.innerText = f"Could not generate WOCE"
checks = json.dumps({"flags": check_flags})
# cannot actually pass objects it seems, so stringiffy it
return my_worker.to_xarray(arr_buffer, checks)

def _handle_fail(something):
status = document.querySelector("#status")
status.innerText = f"Failure see traceback..."
document.querySelector("#process_exchange").disabled = False
status.innerText = "File Generation Complete"

def _process_exchange(*args):
document.querySelector("#process_exchange").disabled = True
Expand All @@ -155,14 +106,9 @@ <h4>Options</h4>
file_list = document.querySelector("#ex_file").files
first_item = file_list.item(0)

first_item.arrayBuffer().then(to_xarray_callback)
first_item.arrayBuffer().then(_handle_submit, _handle_fail).then(_handle_success).then(_make_derived).catch(_handle_fail)
except:
status.innerText = "Error, was a file picked?"
document.querySelector("#process_exchange").disabled = False

</script>
<h4>Python Log Console</h4>
<py-terminal true></py-terminal>
<py-config type="toml">
packages = ["xarray", "cchdo.hydro<=1.0.2.8", "netcdf4"]
</py-config>
</script>
72 changes: 72 additions & 0 deletions docs/_static/convert_exchange.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import io
import json
import logging
import traceback
from html import escape

from js import Uint8Array, console # noqa: F401
from pyscript import display # type: ignore

from cchdo.hydro import __version__ as hydro_version
from cchdo.hydro import accessors, read_exchange # noqa: F401
from cchdo.params import __version__ as params_version


def logger(msg):
display(msg, target="log", append=True)


class DisplaylHandler(logging.Handler):
def emit(self, record) -> None:
logger(self.formatter.format(record)) # type: ignore


root = logging.getLogger()
root.setLevel(logging.DEBUG)

handler = DisplaylHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
root.addHandler(handler)


def versions():
return {
"hydro_version": hydro_version,
"params_version": params_version,
}


def to_netcdf(ex):
ex.to_netcdf("out.nc")
with open("out.nc", "rb") as f:
return f.read()


class Pre:
def __init__(self, text):
self.value = text

def _repr_html_(self):
return f"<pre>{escape(self.value)}</pre>"


def to_xarray(array_buffer, checks):
checks = json.loads(checks)
logger(checks)
logger("to_xarray called")
bytes = bytearray(Uint8Array.new(array_buffer))
logger("got bytes")
ex_bytes = io.BytesIO(bytes)
try:
ex = read_exchange(ex_bytes, checks=checks)
logger("success! makeing a netCDF file")
except ValueError as er:
display(Pre("".join(traceback.format_exception(er))), target="log", append=True)
display(er.error_data, target="log", append=True)
raise # this is so the promise rejects and the main thread knows what's up
return to_netcdf(ex), ex.cchdo.gen_fname()


__export__ = ["to_xarray", "versions"]
31 changes: 31 additions & 0 deletions docs/_static/derrived_makers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import xarray as xr
from js import Uint8Array
from pyscript import display

from cchdo.hydro import accessors # noqa: F401


def logger(msg):
display(msg, target="log", append=True)


def load_netcdf(array_buffer):
with open("out.nc", "wb") as f:
f.write(bytearray(Uint8Array.new(array_buffer)))
return xr.load_dataset("out.nc")


def make_derived(array_buffer, type):
logger(type)
xr = load_netcdf(array_buffer)
if type == "to_sum":
return xr.cchdo.to_sum(), "summary.txt"
if type == "to_woce":
return xr.cchdo.to_woce(), xr.cchdo.gen_fname("woce")
if type == "to_coards":
return xr.cchdo.to_coards(), xr.cchdo.gen_fname("coards")


__export__ = [
"make_derived",
]
28 changes: 28 additions & 0 deletions docs/_static/sw.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*! coi-serviceworker v0.1.7 - Guido Zuidhof and contributors, licensed under MIT */
/*! mini-coi - Andrea Giammarchi and contributors, licensed under MIT */
(({ document: d, navigator: { serviceWorker: s } }) => {
if (d) {
const { currentScript: c } = d;
s.register(c.src, { scope: c.getAttribute('scope') || '.' }).then(r => {
r.addEventListener('updatefound', () => location.reload());
if (r.active && !s.controller) location.reload();
});
}
else {
addEventListener('install', () => skipWaiting());
addEventListener('activate', e => e.waitUntil(clients.claim()));
addEventListener('fetch', e => {
const { request: r } = e;
if (r.cache === 'only-if-cached' && r.mode !== 'same-origin') return;
e.respondWith(fetch(r).then(r => {
const { body, status, statusText } = r;
if (!status || status > 399) return r;
const h = new Headers(r.headers);
h.set('Cross-Origin-Opener-Policy', 'same-origin');
h.set('Cross-Origin-Embedder-Policy', 'require-corp');
h.set('Cross-Origin-Resource-Policy', 'cross-origin');
return new Response(body, { status, statusText, headers: h });
}));
});
}
})(self);
2 changes: 1 addition & 1 deletion docs/exchange_checker.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ It will also output the other legacy formats at CCHDO: WOCE, the COARDS netCDF f
This converter is only available in the html/browser versions of the documentation.

:::{note}
Processing a CTD file can take a long time and I don't yet know how to show progress in the browser.
Processing a CTD file can take a long time.
:::

:::{warning}
Expand Down

0 comments on commit b948f9c

Please sign in to comment.