From 70dfd93d185b63e59ea6eb8492e8d5adf93bd27f Mon Sep 17 00:00:00 2001 From: akrherz Date: Tue, 5 Dec 2023 15:15:44 -0600 Subject: [PATCH] convert to pybufrkit and iterate --- environment.yml | 4 +- examples/BUFR/ISAB02_CWAO.bufr | Bin 0 -> 741 bytes examples/BUFR/ISIA14_CWAO.bufr | Bin 0 -> 834 bytes examples/BUFR/ISND01_LEMM.bufr | Bin 0 -> 1374 bytes examples/BUFR/ISXT14_EGRR.bufr | Bin 0 -> 15190 bytes parsers/pywwa/workflows/bufr_surface.py | 295 ++++++++++++++++-------- tests/workflows/test_bufr_surface.py | 29 ++- 7 files changed, 232 insertions(+), 96 deletions(-) create mode 100644 examples/BUFR/ISAB02_CWAO.bufr create mode 100644 examples/BUFR/ISIA14_CWAO.bufr create mode 100644 examples/BUFR/ISND01_LEMM.bufr create mode 100644 examples/BUFR/ISXT14_EGRR.bufr diff --git a/environment.yml b/environment.yml index 027e3f31..d24ce295 100644 --- a/environment.yml +++ b/environment.yml @@ -25,14 +25,14 @@ dependencies: # Temporary until my breakage clears - openssl <3.2 - pandas - # BUFR processing - - pdbufr # geoplot usage - pillow - pint - psycopg # Transient via pyIEM - pyarrow + # BUFR processing + - pybufrkit # Models - pydantic # gini reader diff --git a/examples/BUFR/ISAB02_CWAO.bufr b/examples/BUFR/ISAB02_CWAO.bufr new file mode 100644 index 0000000000000000000000000000000000000000..5493644c3d6905c2238517efd38d191b6949a909 GIT binary patch literal 741 zcmZ8fZ%9*76hH62xu;WmFHIpe-%}x`25w97Pug=DK{JYsD3!=IMzTJ{G-xbodn2eU zbULsyiU@+}OQ95EQt!(!8RVxzqTuD1{vh!RgDu8xXB!rDINbB+cg{V(!@W#lp{cRO zDxo=W)ZS&WN!`cohb2ps&0?`gy}fn>x{mGd1+riOC*ym+Q-7wG zc>Ir@RiwzY{BJY}X8w&4%(zNLv~_H8eEMXe1W>&4sb#}FZN0GmG~ufqs(yZVZEoEE zt^e5|zz*9>-~4NFcyOxV&Mx_i<60Ql?+1(Qk)e6V*CRII6)ixMJge&MJnpV$G%ltL zH=~Y`qS5QY;^?`?(uR+D$SF{4#0!zEzpdDpBI(=1F#r3Bme*a literal 0 HcmV?d00001 diff --git a/examples/BUFR/ISIA14_CWAO.bufr b/examples/BUFR/ISIA14_CWAO.bufr new file mode 100644 index 0000000000000000000000000000000000000000..b068ed3bd0249258d4ee0c29aa8fa71d9f1d02e2 GIT binary patch literal 834 zcmZWoO=uHQ5T0aH6B}b9Y85NyS*@*T+C*ElB28Yln`U*BT{pYYpx{A8q=zDS=t0aT zicnglD0t8wilr(Ah2|uPbn&1!@edJuF!Z8SkU~X*P4k@HT0J-{Z|0jf^L@jx&*Agc zckGW4bYe!Nkq*Kir!on5?u&%On2hTQ6}UzhKm$N0EE@s$mo;@8EwCEEXTDUxIS)4+ z50mhrU*FTX1ugexG$E9bbEH*oBNEM02~|Yo z?rD)xo2E0%Wwx#&tJ)uEJLFSjo@%0?{v|!83vq#OKZ3195;C+`AfhT}t&&iW9+G5T zv3vht0b>sEE)-#_3z~|sr3eAUQLTB`+Y1vz0NZz*)7v8t4u)@O?`9sEpAHS2-;`(7 ziD#bV$-T+atC6wq!*fqUO+{%T|265nw%!D9xSuiiY~#JOcbxxdY;KFq7~q0I&+F1O z>KB4B{ah)&7{0ppv@zFfCLG{rCfs%bfuSEs z`l~x*1R3Kh!9qDdALZtw)(;cqYVLC0tokv{YHo%Xtm4UAJ=fdgpxI`HFu17|2p9RL6T literal 0 HcmV?d00001 diff --git a/examples/BUFR/ISND01_LEMM.bufr b/examples/BUFR/ISND01_LEMM.bufr new file mode 100644 index 0000000000000000000000000000000000000000..8526ad67018b1b8b5d9ca69fa1e780cb8f5409a6 GIT binary patch literal 1374 zcma)6ZERCz6h5~fc6Vbbu2Q2Ww!@BXnV4$EhiJH+u(_p<5!fW)4`$;B&WQZc5HokL zQ=^zg$TDU$LdR%?kw8o!koa|rE&+a+2|-5|_1-KdTcWrc>)N&V-s3r!^n*XfC+#`U zIrpBo&-T;$qu(pVWsQ_s>?H zzu)4UAF40Q1OicC# z5F;RA3KW@M&|STx45w`^D(z#Gl7y7_Vp{QHhD~bYkS`yFyBBUEW+wS5skGRaVc=j{ zfcXAnhke)S^Fw<@89Z|r_c-r={ebVKM=P$p^L+igaWNC%I>WK^i^FFm#HBRcr*ej2 zc*xi?F%}oUnuktwRD@w((pYlzX>&O;bfH8>cEH zl^XjJ7Os{!vA@LO!OB>l=vdk4=&Ovy-Wa}D6a+A`t06XBIXK+1A_^yEG=Zi)>EQ7}7R}>Ewr4o0X{)aqP~Xok3j&l5RoWOIl`n2f48-0w2!DvS>E>9T*QNUHrYU%Xbve& znMG>LtV~Z?CHpd{PnHFUlMhWabTqYxT(`mbw|-7I(DCyJy{|Zz2F2~yr~32naUJ#d zmv3hdGSzD|b_|P$J1H?W?l^{+=F12bJU$k*(bS+xUC2pvhN;4RhEchl+F~2${6*>W e2KRuaA@+As6_UyY_9bNIOPpAC9fAHlo&N$21(dS@ literal 0 HcmV?d00001 diff --git a/examples/BUFR/ISXT14_EGRR.bufr b/examples/BUFR/ISXT14_EGRR.bufr new file mode 100644 index 0000000000000000000000000000000000000000..076104e1098b394ffcbd7b1b2c4514349245bafa GIT binary patch literal 15190 zcmeI(X;f2Jy8z%2a)dylOhIPJgdq^fiA)iKBm~R>f&_?yf-)BrEmCWpS_zX(Qb0sR zw8|s`D&T|@G7EwhMJQ9f>onZ zwH#3wOZ+b^C?ptlu=H9S7--3NM1>3ZI7*b>fdp3pi9j$U5PS$dD; z3(ORjT|jwIQ-J+YfJR0zAkR>vqAuUoONJQ@1?nY3O$f{vG7VyE1Q=g3%uq0YWS9v9 z27!LSOoVZOIJklN6G3t+R@egN!3;xy`-2QM8gQSJF_<|T0rg@~GlRrIOdbHnK}=2s z=L^h~R2EPk+`#-PsSt|<+ouGLico<(!_Cp{+xAfrr~&ojaS#*!FAW27Peq^xSE1~H{Bm=}oA5IL}(LX8aOnFcdmHn9KFV8*ug z`*!}(U?!)!0dY{{>40$%iv-tL8pMo^9f5rsYA6Qu!x(0wSYS;wh8YH)$Bkj86awp{ zF~rOfHtE~Ez%3TYpE&|Cwz(Nl9@H?fUN*!~1jwH`)P%siH%DOxuJ`6B#6)7?{xF9c zIKJj0h+#-PkQa#25e&#P)MzM>XFA+~{NZsB!+`sUPJ@^Tjrq18bf}4d`y}8}@Cz zJQ~Enac1=mv)W;7a* z7ZhsX85pHO%$yF)Qxs-wNc*5hrF=VXC>v%%FfVL~Im&?Z3kow4)eNW?W?}@Ge<;Kp z5%9ANxS_!Q;fO#Ds27iem=G8bM+9cF;cxrvh`)1qw4_Wp%4>+dxay!gh(i`o)rvJOkjqltYaM*1>}q zWbcu~Ob9*)$YCZjW&wGDnh=}^BA6+FeM>Ham>67}~aOv!2BsNm<0oQQDC4cU50@;sL2MwIEX>^csaxrh!WV}phnXL z?n?#Kz%!wO1~E3cjw@h>@`3eF0W~==Ulb_BMBsX*fExIWqYyz%44%UjFvE0#`%(cl zKCr(jFo-G1;J86dNkxGD4QeR(e6EC8D1rich8sBMp^)c=P-Cj)xAPflZ1DLv6l#9J zIv&b~7z&;@LZJqJE+2}577qDb{(mrFjlknz2AogvIM8V9uy6Ah4mI!@F&y&yHqXs@(rWV4Y^h1xial-jfNj0d`>;yy=I`sZ%An;JYEGtRVTjI7>`mFOXRd?pKONlmc2?*le@ZNig{Oy7VTpll|EaZJyhCs7>)M(?oH|M z&kRroCrX-bm8~f^Fge3o`3p zKUQMVKQ8RQlO!e`>5dtuCJZMBY5Qn)TBlygOf~P%;Jn92j0cc{cx}s@QMYQl>%9}L z@`mM%fI;b@Lknsrzf;`iq~|Hf+H2mI{PD^=*(h&vb9?Z?!LK|*jIO_I27eHM`7k&MXuS*?$pJ(q&;^M?3*9|W_{H?NI5oD-#z^2 zYQ@;i>OAi?MtOagJ9hn=vFcgYr6JvUa|G@d(Vk{CnZkf~F^8u5EY`1c8+$OvddwoI zePD{Yci8{_KjvrumKMCrroUj{uJAo7<6qhP2&nQibpDIPaQz!o@2tX*Xr4&iJrZL4%wgs&l@K{*%KzHI6&cH}&)$ zWG$x6Dl)`Od$gSM3=WTHFS%^`_^R{OGT*+$Jwa4h%n~a`LqjyFhEa*J9%D}yq^Ya4ISRA=aX&GPValMjO>22fp<4o!EKWXb$bMN2m zoJiQw+h3`%$9vAo*8>-yln!Kt?$tS2za3lpxvb6KF{K>iMHDS>`(k|d^MYjBf-3HJ zON;xCTJK^QwMfxqHLYz#gV(>!KJ0GpQ^KOz%0ly3pD?>+67S2)n|^*4_3`&dzs~Eo@PAyh z|Ek|^`XR2TB&VjECeP<~mMeESH*vm?3K+L;3zu>#vJ_I~#rXOyH|fe+N&V*YKCd_K z-e}03vQ_RDJm|3ciwsHke{4j zHqmL*m$_{B+AsiXt2-Ne4+?f3jFz%`vM`3Oo|M5ZOYn8L zv3uRVx(Ch@tBG`Z-_@o2%8oidzuf=Fue+!UE0c=ggN;YTk%7nbs>7^CKD=mu{h-L^ zJZqUknpu%e?baL0VOO_0*ldeeZhKqz;dv;%q}h{ampc|dx9j;?>pzl%HZQ9AYkN?f zq-G!^N$=bU`^OO0WFvM6JO)ZGk8a8ddZtHO&^<`&xX3pRgjlD;;{e(X43N%XHH zWw~KGn|7Z+Z1a}hj`C9((V;m# zC+6NZKi=RP@e8NzlOl+Du4eI1NVuKWxsab<@jpIPIvCW&TL&tOs_N&LOwZws{O6yb z%J#MuE>K0MrC!z`p7!oe+*yd;U_@6f-{f*~di|=Nu}eFyE6!shItWQ31w;2*;qt&(4%>I(gs$cf?u5vDD&A<4GY125mHg)VC(j^yUmBs2`Vb3r2S-Iiqg=g1)EU$Ym43?YLmN^IQX`F8# z!HN7Up!1RHey94`3}c0spu+gl#}nJ%huExaT-xeiF|psP=dDCOnO=HzQ&ZWWh3_9Q zU!9#x)w45UWuLOvUmeJvX8DeU4mX}?isXh0f}3LxKJ6D@FV)$~w|wWBleoj+&it@R z|Fgq_k?Gj>q0*N5Xp$;Eh&Wh7&2}@@+x&!g;@XN;wLzBx4zHMf^R#&#mFHlxJm&Ki z@;}9#m3sp&3YPs`a_pn6q-iiC`pEtu{e?A#J}wQ#>oXrjQ_pq#=P~vhd0md`&+jhc zK4d8`_&wOP_uMq*ZdA0*IlbL!h|9KGg^ zajPN{7)4%#ND`H=j!`O)(*(mfaQ>7b7f55K>- zxmWPIAf_cZd;Q$G?QX|YHwWySKIA;QYDDi+o3%ePu-v;Yly$Qi^CM1_B7I9EB@G=J z|I+PmW$ruIcjm`k$p^TP+>~}Qldu9Om-)6y{)z3s-AP-taplO=#db5NyDn?bJUR9K z@2dw_&OLa+zVP5-rda;m#^(0Y;MaSOJh5N)^V(y7-@DlO+tx@?!RwFoCvU{*S!rZ; z$Ho4PtqJcZhhE+MJl*uqXxyuJYyUMh^7kw8+lL`aN@JbzHtO9O(^J<&Y$_T<%2`b= z*seK3D~t65yY=^SIWf&ieKEJ6rtz-NQWm7w)5+`6g=0}ag@wFqX&SBe>h#G~XGKUz zma|gg?T90DRhy8M(3P|igb*;Fn8Z!3B@+tpw+z)Hhy+VrH60_h6jN0T{sKt}(NU2L zjde%_57)F7{6|x@M*O=L;h;Qy_^8(4N@OH1Gd^Kffd(O|EW6e*Eo64e(@qM%ReQvS z*hor|8LJhj6Zi&d9{MU5V^yg(p;3Y8;CDIxR#lFSLrgJK^*X6X_@vib6AS1>9sF_& z)l*wyo36?TZ`B;aw2n&i#~mT~k6Sew@#YazIA&^X=Bi;+)rBhqwVO2xW)UJhNCdoO zyiEKLUcwI={6*^1_;_FdBd*U?SQ%O1pZ!n997wmz@;E@(;BS||n@s58jlaN$Gj^Lzb;i)rJ#T%$( zcv`emS~OC&Id#V4g?}xL5MY%WZ=}k?U*Ii#Gsk05+6G)y9h6H9u-GJ2UrKHg6JT3+ff-d{W(?>>I*0(Cy#_r>X(@S^aZ zZrjm|cM5MFUPVt{JShdsFMW!ahQEckQ=o3CM$o}CjQ{!1*s+KbqeIzS_SE8+tjIXa zEvmI767aX2)sygH!c&cpApREq8h1Sv2X6zOSUgww7&)p3_@_TLR~t;t;^1>BFRK@y zJh_^U_(0=*$NLh15b(a>Lx#5&uU(Un;!TRc3&%UoAX?&+ikdR)l^&w65<2XXEz8*4 zU*58H$JuVPpp&j%Z7I`>zNmj``>gR< zcg(r%qreo+q~7`fxdi_(;2ztrR1niUK5uF>n%KS1s^I`?(X~c@%BR_>&zGEMjXHFd zku)i&f7jhFd71*!<$In~>WSr7?{WR=j%l@U_vdUl`l&^)zh7{yuH`k)_S8}C$rG9s z%ckMWCb?YR@w(O_N+OEZ^mK|}XgOCrw3(Y1-q_IboHF3x@_Bsnp6^eB)GNE{>}u+h zkM>h33;B0D(@3`D6Lr_t(DPJ{`5i%)4JnNeJJl%zoF?~frcK`QX0zTPas%7WqD7n8 zkXJu&+?ADB(%f++*T653zokQ;F_oV}An0Ax`rMp(B~4?1-X>H_o0I1u*w{A|{9@_x z)2)Af?0;q3&^$!RvqL{$u2-AVZEEOQxQhIX;8@?4om^q_?S5ZAwLO|4k<0AkFBZ*c|IzqM$BDU=Ky+L8KK-dcesAXjy#f4Jc3oq) zZ92+~n8YSS$%%;66~lb?wV$ zhu(@0xv!3|yjV}4$`Ty7(s6Y4nMOW--INBuren*eHIq$~mluBVBQ#TbnIwuOe?$AY z;lT37o{oiHM+}>iyXyk_q(>#EObr^83nH#oT2IxVwjJ)Dm$=1s>Q%*)HT%(by{4MB z^NtT*V!T=NJD=2cLSu@#;J~$Lr>T4P7g|?XQA)U++7k@FT>9*^A!cH3)9lXYzDKR? zH?(@PD8Dwgbku#enGwwD_0^^LHy!H=Rh^R&oc93XFPvjT=^?I$c3Vc`^1^E)|{U zzC7O6dZRw4UQm7|>aq>n?&P`g*@+I#{Jt9xwAbL1foVb6$S>}QwrjZLU@{o^>UrCO zylYxt)_iW7zP!&mueNz*U*O~CrA_%gKJsULJK~wmZ@#h#!@!yUw8%YfOI`jdADJWa3*pH8rL{$aBIawkdWVCs}r<*|e5iqZs9 z`Bt7$NTtKw)eE1$sw%=R%vk;_Yoc~OhBQ(xM?d{KSD*BpGsQQ`loe|An7vS#-t$Yqc?RMuWBIz#`8OhnlfL!M zX(br5VtSFV7>XEi3Q8Avdaia%UCOR)&h2b-9hF=IT%c`V(U8R6vf^q3@u z(`^ux$n9Y1h}9o28sFlg{@ATnb9;K$;%Jtj^7EqTCWaDb@v4m9$xs}P9jKR zmP)@KOAc_WEduHStD}wh9qX(;9?xOMJ5`4z+^kU<5BMJLjc=XFf&w_VX#d+nwd|CBwiD#o5GAn@L z#`NrRt&DN$@L)|g)Wvp$scoZ06>!pYP|q+YiG-0}fmO6}t5*i6_SmsV9!x{N>ZUpK zu6uNuo3s&2pOAaAVydDROB!vY7kox1&BvnHRUuv#kNN72q9{I#74J2=P(QNBC_^hS z?U*a6)Q9xaKeHxYTHzDBVOiTtoiulc=o$A8!aQ%2$mmvgwLD{@9ldh(!ch<139iI5 z%uym>#>OqnbdOHYlx5rxXW@&~rzLDXMb|lBbqkcu2#juU8T-tuwB(Op5viYcSGPn{ zMp?;1gS(Z?_IW6jl;sD!}{?(_8e*yDuq07nzDG)-J+TQ0U_ zC>B?aa!Lfo+DO{yOhZUCuUKo=4?@2hiDhoam0Rm=ON8cT*K)cQMmealPL}-i;Dwmz z1@7}mW9uCbNEy4Z)ikxllkW5M`??rvZY835COf^OIfL=vI~C_J4h4KlS;n1nd0c$t z<`hEA$*3Ar^*;HzP1~z0O29Tq78YfDMz$2o7FiT!R~4b8)D<>!wX6U7aN(i5r3vMc z;NU>iGH!8Br=s9Oi23Y%@5itAlC~+Wb26e11>9fvouD8qYQ2DKD*dw@q)b$2U=Irt_)chrnkx1MtO|(NKUbYsf`@1x!yKnPNjIOw?EU& zaey`&ov#j-7KMOc&?p*GNqJ&RZ^*h@+0(#o#x x7#fXfPIkcuWr1$Nm6dkCYGiuS&2n7L=IluCh-8(zns5I^AUNY+`2RGk{$KyMF8}}l literal 0 HcmV?d00001 diff --git a/parsers/pywwa/workflows/bufr_surface.py b/parsers/pywwa/workflows/bufr_surface.py index 7bcc617d..ec6afc58 100644 --- a/parsers/pywwa/workflows/bufr_surface.py +++ b/parsers/pywwa/workflows/bufr_surface.py @@ -8,12 +8,13 @@ """ # stdlib -import os -import tempfile import warnings +from datetime import timedelta # 3rd Party -import pdbufr +from pybufrkit.decoder import Decoder +from pybufrkit.errors import UnknownDescriptor +from pybufrkit.renderer import NestedJsonRenderer from pyiem.nws.product import TextProduct from pyiem.observation import Observation from pyiem.util import LOG, convert_value, utc @@ -70,6 +71,7 @@ "GMFK": 504, # Morocco "GMFO": 504, # Morocco "GMMC": 504, # Morocco + "GMME": 504, # Morocco "GMMG": 504, # Morocco "GMMI": 504, # Morocco "GMMK": 504, # Morocco @@ -79,8 +81,10 @@ "GMMS": 504, # Morocco "GMMW": 504, # Morocco "GMMX": 504, # Morocco + "GMMZ": 504, # Morocco "GMSE": 504, # Morocco "GMTA": 504, # Morocco + "GMTI": 504, # Morocco "GMTL": 504, # Morocco "GMTN": 504, # Morocco "GMTT": 504, # Morocco @@ -102,6 +106,7 @@ "LOWM": 40, # Austria "LPMG": 620, # Portugal "LQSM": 760, # Syria + "LSSW": 756, # Switzerland "LTAA": 792, # Turkey "LYBM": 688, # Montenegro "LYPG": 688, # Montenegro @@ -135,6 +140,7 @@ "SOCA": 74, # French Guiana "SOWR": 724, # Spain "SPIM": 604, # Peru + "SUMU": 858, # Uruguay "TBPB": 52, # Barbados "TLPL": 662, # Saint Lucia "UAST": 398, # Kazakhstan @@ -156,6 +162,17 @@ UNKNOWNS = [] WIGOS = {} NETWORK = "WMO_BUFR_SRF" +DIRECTS = { + "004001": "year", + "004002": "month", + "004003": "day", + "004004": "hour", + "004005": "minute", + "001015": "station_name", + "007030": "elevation", + "005001": "lat", + "006001": "lon", +} def bounds_check(val, low, high): @@ -175,29 +192,24 @@ def load_xref(txn): for row in txn.fetchall(): WIGOS[row["wigos"]] = { "iemid": row["iemid"], - "tzname": "UTC" if row["tzname"] is None else row["tzname"], + "tzname": row["tzname"], } LOG.info("Loaded %s WIGOS2IEMID entries", len(WIGOS)) def add_station(txn, sid, data): """Add a mesosite station entry.""" - if "#1#longitude" not in data or "#1#latitude" not in data: + if "lon" not in data or "lat" not in data: LOG.info("Skipping %s as no location data", sid) WIGOS[sid] = {"iemid": -2} return - sname = data.get( - "#1#stationOrSiteName", - data.get("#1#stationName", data.get("#1#longStationName")), - ) + sname = data.get("sname") if sname is None: LOG.info("Skipping %s as no station name %s", sid, data) WIGOS[sid] = {"iemid": -2} return - sname = sname.replace(",", " ") - elev = data.get( - "#1#heightOfStationGroundAboveMeanSeaLevel", data.get("#1#elevation") - ) + sname = sname.replace(",", " ").replace("\x00", "") + elev = data.get("elevation") txn.execute( """ INSERT into stations(id, wigos, name, network, online, @@ -210,8 +222,8 @@ def add_station(txn, sid, data): sid, sname, NETWORK, - data["#1#longitude"], - data["#1#latitude"], + data["lon"], + data["lat"], elev, sname, ), @@ -219,38 +231,21 @@ def add_station(txn, sid, data): WIGOS[sid] = {"iemid": txn.fetchone()["iemid"], "tzname": None} -def process_data(txn, data, prod): +def process_messages(txn, prod, msgs): """Do what we can do.""" - sid = None - if "#1#wigosIdentifierSeries" in data: - sid = "-".join( - [ - str(data["#1#wigosIdentifierSeries"]), - str(data["#1#wigosIssuerOfIdentifier"]), - str(data["#1#wigosIssueNumber"]), - str(data["#1#wigosLocalIdentifierCharacter"]), - ] + data = glean_data(msgs, prod.source) + if not data: + return + sid = data.get("sid") + if sid is None: + return + valid = data["valid"] + # Don't allow products from the future + if valid > (common.utcnow() + timedelta(hours=2)): + LOG.warning( + "%s %s is from the future %s", prod.get_product_id(), sid, valid ) - elif "#1#stationNumber" not in data: - # TODO: How do we handle this? return - else: - # We construct it via the local identifier - ccode = WMO2ISO3166.get(prod.source) - if ccode is None: - if prod.source not in UNKNOWNS: - UNKNOWNS.append(prod.source) - raise ValueError(f"Unknown WMO2ISO3166 {prod.source}") - return - sid = f"0-{ccode}-0-{data['#1#stationNumber']}" - - valid = utc( - data["typicalYear"], - data["typicalMonth"], - data["typicalDay"], - data["typicalHour"], - data["typicalMinute"], - ) meta = WIGOS.get(sid, {"iemid": None}) if meta["iemid"] is None: # prevent race condition @@ -266,71 +261,191 @@ def process_data(txn, data, prod): LOG.info("Skipping %s as iemid is currently -1", sid) return LOG.info("%s %s %s %s", sid, meta["iemid"], prod.get_product_id(), data) + # This likely means that IEM station metadata has yet to sync, so there + # is no iemaccess entry if meta["tzname"] is None: LOG.info("Skipping %s as tzname is None", sid) return ob = Observation(valid=valid, iemid=meta["iemid"], tzname=meta["tzname"]) - if "#1#airTemperature" in data: - ob.data["tmpf"] = bounds_check( - convert_value(data["#1#airTemperature"], "degK", "degF"), -100, 150 - ) - if "#1#dewpointTemperature" in data: - ob.data["dwpf"] = bounds_check( - convert_value(data["#1#dewpointTemperature"], "degK", "degF"), - -100, - 150, - ) - if "#1#relativeHumidity" in data: - ob.data["relh"] = bounds_check(data["#1#relativeHumidity"], 0, 100) - if "#1#windDirection" in data: - ob.data["drct"] = bounds_check(data["#1#windDirection"], 0, 360) - if "#1#windSpeed" in data: - ob.data["sknt"] = bounds_check( - convert_value(data["#1#windSpeed"], "m/s", "kts"), 0, 250 - ) - if "#1#pressureReducedToMeanSeaLevel" in data: - ob.data["mslp"] = bounds_check( - convert_value( - data["#1#pressureReducedToMeanSeaLevel"], "Pa", "hPa" - ), - 800, - 1200, - ) + ob.data.update(data) ob.data["raw"] = f"BUFR: {prod.get_product_id()}" ob.save(txn) -def process_dataframe(txn, df, prod): - """Process the dataframe.""" - for _, row in df.iterrows(): - process_data(txn, row.dropna().to_dict(), prod) +def render_members(members, msgs): + """recursive.""" + for member in members: + if isinstance(member, list): + render_members(member, msgs) + elif isinstance(member, dict): + if "factor" in member: + # print("FACTOR:", member["factor"]) + msgs.append(member["factor"]) + if "value" in member: + if member["value"] is not None: + # print(member) + msgs.append(member) + elif "members" in member: + render_members(member["members"], msgs) + else: + # print("Dead end", member) + pass + else: + # print(member) + msgs.append(member) + + +def glean_data(msgs, source): + """see what we can do with this.""" + data = {} + displacement = 0 + for msg in msgs: + # print(msg["id"], msg["description"], msg["value"]) + if msg["id"].startswith("001"): + data[msg["id"]] = msg["value"] + continue + if msg["id"] == "004024": # TIME PERIOD OR DISPLACEMENT + displacement = msg["value"] + continue + if msg["id"] in DIRECTS: + data[DIRECTS[msg["id"]]] = msg["value"] + continue + if msg["id"] == "010004": + data["pres"] = msg["value"] / 100.0 + continue + if msg["id"] == "010051": + data["mslp"] = msg["value"] / 100.0 + continue + if msg["id"] == "012101": + data["tmpf"] = bounds_check( + convert_value(msg["value"], "degK", "degF"), + -100, + 150, + ) + continue + if msg["id"] == "012103": + data["dwpf"] = bounds_check( + convert_value(msg["value"], "degK", "degF"), + -100, + 150, + ) + continue + if msg["id"] == "013003": + data["relh"] = bounds_check( + msg["value"], + 0, + 100, + ) + continue + if msg["id"] == "020001": + data["vsby"] = bounds_check( + convert_value(msg["value"], "m", "mile"), 0, 100 + ) + continue + if msg["id"] == "011002" and displacement >= -10: + data["sknt"] = bounds_check( + convert_value(msg["value"], "meter per second", "knot"), + 0, + 200, + ) + continue -def processor(prodbytes): + if msg["id"] == "011001" and displacement >= -10: + data["drct"] = bounds_check(msg["value"], 0, 360) + continue + if msg["id"] == "011041" and displacement >= -10: + data["gust"] = bounds_check( + convert_value(msg["value"], "meter per second", "knot"), 0, 200 + ) + continue + if msg["id"] == "011043" and displacement >= -10: + data["gust_drct"] = bounds_check(msg["value"], 0, 360) + continue + if "year" not in data: + return {} + try: + data["valid"] = utc( + data["year"], + data["month"], + data["day"], + data.get("hour", 0), # Unsure if this is too forgiving + data.get("minute", 0), + ) + except ValueError as exp: + LOG.info("ValueError in utc(): %s %s", data, exp) + return {} + # Attempt to compute a station ID + if "001125" in data: + data["sid"] = ( + f"{data['001125']}-" + f"{data['001126']}-" + f"{data['001127']}-" + f"{data['001128'].decode('ascii', 'ignore').strip()}" + ) + if "001002" in data: + ccode = WMO2ISO3166.get(source) + if ccode is None: + if source not in UNKNOWNS: + UNKNOWNS.append(source) + raise ValueError(f"Unknown WMO2ISO3166 {source}") + return + data["sid"] = f"0-{ccode}-0-{data['001002']}" + if "001015" in data: + data["sname"] = data["001015"].decode("ascii", "ignore").strip() + return data + + +def processor(txn, prod, prodbytes): """Protect the realprocessor""" # ATTM we are sending this to the general text parser, so to set # various needed attributes. - header = prodbytes[: prodbytes.find(b"BUFR")].decode("ascii") - prod = TextProduct(header, utcnow=common.utcnow(), parse_segments=False) - with tempfile.NamedTemporaryFile("wb", delete=False) as bio: - bio.write(prodbytes) try: - df = pdbufr.read_bufr(bio.name, columns="all", flat=True) - defer = IEMDB.runInteraction(process_dataframe, df, prod) - defer.addErrback(common.email_error, prod.get_product_id()) - except Exception as exp: - LOG.warning("%s %s", prod.get_product_id(), exp) - common.email_error(exp, prod.get_product_id()) - return None, None - finally: - os.unlink(bio.name) + bufr_message = Decoder().process(prodbytes) + except UnknownDescriptor as exp: + LOG.info( + "UnknownDescriptor %s %s %s", + prod.get_product_id(), + prod.source, + exp, + ) + return + jdata = NestedJsonRenderer().render(bufr_message) + for section in jdata: + for parameter in section: + if isinstance(parameter["value"], list): + # print("-->", parameter["name"], type(parameter["value"])) + for entry in parameter["value"]: + # print("----> entry") + if isinstance(entry, list): + msgs = [] + render_members(entry, msgs) + process_messages(txn, prod, msgs) + else: + # unexpanded_descriptors + # print(entry) + pass + else: + # print("-->", parameter["name"], parameter["value"]) + pass - return prod, df + +def ingest(prodbytes): + """Gets called by the LDM Bridge.""" + pos = prodbytes.find(b"BUFR") + if pos == -1: + LOG.warning("No BUFR found in %s", prodbytes) + return None, None + header = prodbytes[:pos].decode("ascii") + prod = TextProduct(header, utcnow=common.utcnow(), parse_segments=False) + defer = IEMDB.runInteraction(processor, prod, prodbytes[pos:]) + defer.addErrback(common.email_error, prod.get_product_id()) + defer.addErrback(LOG.warning) def ready(_): """Callback once we are ready.""" - bridge(processor, isbinary=True) + bridge(ingest, isbinary=True) def main(): diff --git a/tests/workflows/test_bufr_surface.py b/tests/workflows/test_bufr_surface.py index 7733962b..3cd2d0f9 100644 --- a/tests/workflows/test_bufr_surface.py +++ b/tests/workflows/test_bufr_surface.py @@ -7,6 +7,30 @@ from pywwa.workflows import bufr_surface +def test_badmonth(): + """Test that month of 13 is handled.""" + with open(get_example_filepath("BUFR/ISIA14_CWAO.bufr"), "rb") as fh: + bufr_surface.ingest(fh.read()) + + +def test_nonascii(): + """Test that non-ascii characters are handled.""" + with open(get_example_filepath("BUFR/ISAB02_CWAO.bufr"), "rb") as fh: + bufr_surface.ingest(fh.read()) + + +def test_nosid(): + """Test that having no station id is handled.""" + with open(get_example_filepath("BUFR/ISXT14_EGRR.bufr"), "rb") as fh: + bufr_surface.ingest(fh.read()) + + +def test_notimestamp(): + """Test that we don't error when no timestamp is found.""" + with open(get_example_filepath("BUFR/ISND01_LEMM.bufr"), "rb") as fh: + bufr_surface.ingest(fh.read()) + + @pytest.mark.parametrize("database", ["mesosite"]) def test_load_xref(cursor): """Test loading the xref.""" @@ -23,7 +47,4 @@ def test_simple(cursor): """Test simple.""" pywwa.CTX.utcnow = utc(2023, 12, 4, 2) with open(get_example_filepath("BUFR/IS_2023120401.bufr"), "rb") as fh: - prod, df = bufr_surface.processor(fh.read()) - assert prod.source == "SCSC" - - bufr_surface.process_dataframe(cursor, df, prod) + bufr_surface.ingest(fh.read())