Skip to content

Commit

Permalink
Refactor SVG camera
Browse files Browse the repository at this point in the history
  • Loading branch information
jdejaegh committed Jan 2, 2024
1 parent 3915d9f commit c9ec30b
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 58 deletions.
10 changes: 5 additions & 5 deletions custom_components/irm_kmi/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def camera_image(self,
height: int | None = None) -> bytes | None:
"""Return still image to be used as thumbnail."""
# TODO make it a still image to avoid cuts in playback on the dashboard
return self.coordinator.data.get('animation', {}).get('svg').encode()
return self.coordinator.data.get('animation', {}).get('svg_still')

async def async_camera_image(
self,
Expand All @@ -68,18 +68,18 @@ async def async_camera_image(
async def handle_async_still_stream(self, request: web.Request, interval: float) -> web.StreamResponse:
"""Generate an HTTP MJPEG stream from camera images."""
self._image_index = 0
return await async_get_still_stream(request, self.iterate, self.content_type, interval)
return await async_get_still_stream(request, self.get_animated_svg, self.content_type, interval)

async def handle_async_mjpeg_stream(self, request: web.Request) -> web.StreamResponse:
"""Serve an HTTP MJPEG stream from the camera."""
return await self.handle_async_still_stream(request, self.frame_interval)

async def iterate(self) -> bytes | None:
"""Loop over all the frames when called multiple times."""
async def get_animated_svg(self) -> bytes | None:
"""Returns the animated svg for camera display"""
# If this is not done this way, the live view can only be opened once
self._image_index = not self._image_index
if self._image_index:
return self.camera_image()
return self.coordinator.data.get('animation', {}).get('svg_animated')
else:
return None

Expand Down
18 changes: 7 additions & 11 deletions custom_components/irm_kmi/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ async def _async_animation_data(self, api_data: dict) -> RadarAnimationData:
unit=api_data.get('animation', {}).get('unit', {}).get(lang),
location=localisation
)
svg_str = self.create_rain_graph(radar_animation, animation_data, country, images_from_api)
radar_animation['svg'] = svg_str
rain_graph = self.create_rain_graph(radar_animation, animation_data, country, images_from_api)
radar_animation['svg_animated'] = rain_graph.get_svg_string()
radar_animation['svg_still'] = rain_graph.get_svg_string(still_image=True)
return radar_animation

async def process_api_data(self, api_data: dict) -> ProcessedCoordinatorData:
Expand Down Expand Up @@ -146,7 +147,6 @@ async def download_images_from_api(self,
_LOGGER.debug(f"Just downloaded {len(images_from_api)} images")
return images_from_api


@staticmethod
def current_weather_from_data(api_data: dict) -> CurrentWeatherData:
"""Parse the API data to build a CurrentWeatherData."""
Expand Down Expand Up @@ -300,7 +300,7 @@ def create_rain_graph(self,
api_animation_data: List[dict],
country: str,
images_from_api: Tuple[bytes],
) -> str:
) -> RainGraph:

sequence: List[AnimationFrameData] = list()
tz = pytz.timezone(self.hass.config.time_zone)
Expand Down Expand Up @@ -334,10 +334,6 @@ def create_rain_graph(self,
f"{'satellite' if satellite_mode else 'black' if self._dark_mode else 'white'}.png")
bg_size = (640, 490)

svg_str = RainGraph(radar_animation, image_path, bg_size,
dark_mode=self._dark_mode,
# tz=self.hass.config.time_zone
).get_svg_string()

# TODO return value
return svg_str
return RainGraph(radar_animation, image_path, bg_size,
dark_mode=self._dark_mode,
tz=self.hass.config.time_zone)
5 changes: 2 additions & 3 deletions custom_components/irm_kmi/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ class AnimationFrameData(TypedDict, total=False):
position: float | None
position_higher: float | None
position_lower: float | None
rain_graph: bytes | None
merged_image: bytes | None


class RadarAnimationData(TypedDict, total=False):
Expand All @@ -44,7 +42,8 @@ class RadarAnimationData(TypedDict, total=False):
hint: str | None
unit: str | None
location: bytes | None
svg: str | None
svg_still: bytes | None
svg_animated: bytes | None


class ProcessedCoordinatorData(TypedDict, total=False):
Expand Down
1 change: 0 additions & 1 deletion custom_components/irm_kmi/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
"iot_class": "cloud_polling",
"issue_tracker": "https://github.com/jdejaegh/irm-kmi-ha/issues",
"requirements": [
"Pillow==10.1.0",
"pytz==2023.3.post1",
"svgwrite==1.4.3"
],
Expand Down
117 changes: 79 additions & 38 deletions custom_components/irm_kmi/rain_graph.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Create graphs for rain short term forecast."""

import base64
import copy
from typing import List

import pytz
Expand All @@ -23,8 +24,9 @@ def __init__(self,
graph_height: float = 150,
top_text_space: float = 30,
top_text_y_pos: float = 20,
bottom_text_space: float = 45,
bottom_text_y_pos: float = 215
bottom_text_space: float = 50,
bottom_text_y_pos: float = 218,
auto=True
):

self._animation_data: RadarAnimationData = animation_data
Expand Down Expand Up @@ -54,16 +56,36 @@ def __init__(self,
raise ValueError("bottom_text_y_pos must be below the graph")

self._dwg: Drawing = Drawing(size=(self._svg_width, self._svg_height), profile='full')

self.draw_svg_graph()
self.draw_current_fame_line()

self.draw_description_text()
self.insert_background()
self.insert_cloud_layer()
self.draw_location()

def draw_svg_graph(self):
self._dwg_save: Drawing
self._dwg_animated: Drawing
self._dwg_still: Drawing

if auto:
self.draw_svg_frame()
self.draw_hour_bars()
self.draw_chances_path()
self.draw_data_line()
self.write_hint()
self.insert_background()
self._dwg_save = copy.deepcopy(self._dwg)

self.draw_current_fame_line()
self.draw_description_text()
self.insert_cloud_layer()
self.draw_location()
self._dwg_animated = self._dwg

self._dwg = self._dwg_save
idx = self._animation_data['most_recent_image_idx']
self.draw_current_fame_line(idx)
self.draw_description_text(idx)
self.insert_cloud_layer(idx)
self.draw_location()
self._dwg_still = self._dwg

self._dwg_animated.saveas("animated_rain.svg")

def draw_svg_frame(self):
"""Create the global area to draw the other items"""
self._dwg.embed_font(name="Roboto Medium", filename='custom_components/irm_kmi/resources/roboto_medium.ttf')
self._dwg.embed_stylesheet("""
Expand All @@ -78,18 +100,23 @@ def draw_svg_graph(self):
rx=None, ry=None,
fill=fill_color, stroke='none'))

self.draw_hour_bars()
self.draw_chances_path()
self.draw_data_line()
self.write_hint()
def draw_description_text(self, idx: int | None = None):
"""For every frame write the amount of precipitation and the time at the top of the graph.
If idx is set, only do it for the given idx"""

def draw_description_text(self):
"""For the given frame idx, write the amount of precipitation and the time at the top of the graph"""

times = [e['time'].astimezone(tz=self._tz).isoformat(sep=' ', timespec='minutes') for e in
times = [e['time'].astimezone(tz=self._tz).strftime('%H:%M') for e in
self._animation_data['sequence']]
rain_levels = [f"{e['value']}{self._animation_data['unit']}" for e in self._animation_data['sequence']]

if idx is not None:
time = times[idx]
rain_level = rain_levels[idx]

paragraph = self._dwg.add(self._dwg.g(class_="roboto", ))

self.write_time_and_rain(paragraph, rain_level, time)
return

for i in range(self._frame_count):
time = times[i]
rain_level = rain_levels[i]
Expand All @@ -107,17 +134,19 @@ def draw_description_text(self):
repeatCount="indefinite"
))

paragraph.add(self._dwg.text(f"{time}", insert=(self._offset, self._top_text_y_pos),
text_anchor="start",
font_size="14px",
fill="white",
stroke='none'))
self.write_time_and_rain(paragraph, rain_level, time)

paragraph.add(self._dwg.text(f"{rain_level}", insert=(self._svg_width / 2, self._top_text_y_pos),
text_anchor="middle",
font_size="14px",
fill="white",
stroke='none'))
def write_time_and_rain(self, paragraph, rain_level, time):
paragraph.add(self._dwg.text(f"{time}", insert=(self._offset, self._top_text_y_pos),
text_anchor="start",
font_size="16px",
fill="white",
stroke='none'))
paragraph.add(self._dwg.text(f"{rain_level}", insert=(self._svg_width / 2, self._top_text_y_pos),
text_anchor="middle",
font_size="16px",
fill="white",
stroke='none'))

def write_hint(self):
"""Add the hint text at the bottom of the graph"""
Expand All @@ -127,7 +156,7 @@ def write_hint(self):

paragraph.add(self._dwg.text(f"{hint}", insert=(self._svg_width / 2, self._bottom_text_y_pos),
text_anchor="middle",
font_size="14px",
font_size="16px",
fill="white",
stroke='none'))

Expand Down Expand Up @@ -226,12 +255,12 @@ def draw_hour_bars(self):

if is_hour_bar:
graph_rect_center_x = x_position
graph_rect_center_y = self._graph_bottom + 15
graph_rect_center_y = self._graph_bottom + 18

paragraph = self._dwg.add(self._dwg.g(class_="roboto", ))
paragraph.add(self._dwg.text(f"{time_image.hour}h", insert=(graph_rect_center_x, graph_rect_center_y),
text_anchor="middle",
font_size="14px",
font_size="16px",
fill="white",
stroke='none'))

Expand All @@ -241,34 +270,43 @@ def draw_hour_bars(self):
end=(self._graph_width + self._interval_width / 2, self._graph_bottom),
stroke='white'))

def draw_current_fame_line(self):
def draw_current_fame_line(self, idx: int | None = None):
"""Draw a solid white line on the timeline at the position of the given frame index"""
x_position = self._offset
x_position = self._offset if idx is None else self._offset + idx * self._interval_width
now = self._dwg.add(self._dwg.line(start=(x_position, self._top_text_space),
end=(x_position, self._graph_bottom),
id='now',
stroke='white',
opacity=1,
stroke_width=2))
if idx is not None:
return
now.add(self._dwg.animateTransform("translate", "transform",
id="now",
from_=f"{self._offset} 0",
to=f"{self._graph_width - self._offset} 0",
dur=f"{self._frame_count * 0.3}s",
repeatCount="indefinite"))

def get_svg_string(self):
return self._dwg.tostring()
def get_svg_string(self, still_image: bool = False) -> bytes:
return self._dwg_still.tostring().encode() if still_image else self._dwg_animated.tostring().encode()

def insert_background(self):
with open(self._background_image_path, 'rb') as f:
png_data = base64.b64encode(f.read()).decode('utf-8')
image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size)
self._dwg.add(image)

def insert_cloud_layer(self):
def insert_cloud_layer(self, idx: int | None = None):
imgs = [e['image'] for e in self._animation_data['sequence']]

if idx is not None:
img = imgs[idx]
png_data = base64.b64encode(img).decode('utf-8')
image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size)
self._dwg.add(image)
return

for i, img in enumerate(imgs):
png_data = base64.b64encode(img).decode('utf-8')
image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size)
Expand All @@ -290,3 +328,6 @@ def draw_location(self):
png_data = base64.b64encode(img).decode('utf-8')
image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size)
self._dwg.add(image)

def get_dwg(self):
return copy.deepcopy(self._dwg)

0 comments on commit c9ec30b

Please sign in to comment.