Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Authentification par cookie / token #37

Closed
wants to merge 9 commits into from
1 change: 1 addition & 0 deletions resources/addon.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def channel(channel_id: str):
listitem.setProperty('inputstream.adaptive.manifest_update_parameter', 'full')
listitem.setProperty('inputstream.adaptive.license_type', stream['license_type'])
listitem.setProperty('inputstream.adaptive.license_key', stream['license_key'])
listitem.setProperty('inputstream.adaptive.play_timeshift_buffer', 'true')
xbmcplugin.setResolvedUrl(plugin.handle, True, listitem=listitem)

@plugin.route('/iptv/channels')
Expand Down
119 changes: 97 additions & 22 deletions resources/lib/provider_templates/orange.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
from dataclasses import dataclass
from datetime import date, datetime, timedelta
import json
import os
import re
from urllib.error import HTTPError
from urllib.parse import urlparse
from urllib.parse import urlparse, quote
from urllib.request import Request, urlopen
import xbmcvfs

from lib.providers.provider_interface import ProviderInterface
from lib.utils import get_drm, get_global_setting, log, LogLevel, random_ua
from lib.utils import get_drm, get_global_setting, log, LogLevel, random_ua, get_addon_profile

@dataclass
class OrangeTemplate(ProviderInterface):
Expand All @@ -28,17 +31,15 @@ def __init__(
self.groups = groups

def get_stream_info(self, channel_id: int) -> dict:
req = Request(self.endpoint_stream_info.format(channel_id=channel_id), headers={
res, cookie, tv_token = self._auth_urlopen(self.endpoint_stream_info.format(channel_id=channel_id), headers={
'User-Agent': random_ua(),
'Host': urlparse(self.endpoint_stream_info).netloc
})

try:
with urlopen(req) as res:
stream_info = json.loads(res.read())
except HTTPError as error:
if error.code == 403:
return False
if res is None:
return False

stream_info = json.loads(res)

drm = get_drm()
license_server_url = None
Expand All @@ -47,6 +48,7 @@ def get_stream_info(self, channel_id: int) -> dict:
license_server_url = system.get('laUrl')

headers = f'Content-Type=&User-Agent={random_ua()}&Host={urlparse(license_server_url).netloc}'
headers += f'&Cookie={quote(cookie)}&tv_token={quote(tv_token)}'
post_data = 'R{SSM}'
response = ''

Expand All @@ -63,25 +65,27 @@ def get_stream_info(self, channel_id: int) -> dict:
return stream_info

def get_streams(self) -> list:
req = Request(self.endpoint_streams, headers={
'User-Agent': random_ua(),
'Host': urlparse(self.endpoint_streams).netloc
})

with urlopen(req) as res:
channels = json.loads(res.read())
nuxt, _, _ = self._get_auth()
params = re.search(r'}\((.*?)\)\);', nuxt).expand(r'\1')
params = re.sub(r'Array\(.*?\)', '[]', params)
params = json.loads(f'[{params}]')
channels = re.search(r'{channels:(\[.*?\]),channelsPC', nuxt).expand(r'\1')
for rep in [ ('{', '{"'), ('}', '"}'), (':', '":"'), (',', '","'), ('}","{', '},{') ]:
channels = channels.replace(*rep)
channels = json.loads(channels)

streams = []

for channel in channels:
channel_id: str = channel['id']
channel_id = params[self._index(channel['idEPG'])]
logoindex = re.search(re.escape(channel['logos']) + r'\[3\]=.*?path:(.*?)}', nuxt).expand(r'\1')
streams.append({
'id': channel_id,
'name': channel['name'],
'preset': channel['zappingNumber'],
'logo': channel['logos']['square'].replace('%2F/', '%2F') if 'square' in channel['logos'] else None,
'id': str(channel_id),
'name': params[self._index(channel['name'])],
'preset': str(params[self._index(channel['lcn'])]),
'logo': params[self._index(logoindex)],
'stream': f'plugin://plugin.video.orange.fr/channel/{channel_id}',
'group': [group_name for group_name in self.groups if int(channel['id']) in self.groups[group_name]]
'group': [group_name for group_name in self.groups if channel_id in self.groups[group_name]]
})

return streams
Expand Down Expand Up @@ -142,6 +146,77 @@ def get_epg(self) -> dict:

return epg

def _index(self, name):
table = {}
for i in range(26):
table[chr(97+i)] = i + 1
table[chr(65+i)] = i + 27
table['_'] = 53
table['$'] = 54
index = 0
for car in name:
if car == "0":
break
index *= 54
index += table[car]
return index - 1

def _get_auth(self) -> tuple:
timestamp = datetime.timestamp(datetime.today())
filepath = os.path.join(xbmcvfs.translatePath(get_addon_profile()), 'auth')

req = Request("https://chaines-tv.orange.fr", headers={
'User-Agent': random_ua(),
'Host': 'chaines-tv.orange.fr',
})

with urlopen(req) as res:
html = res.read().decode()
nuxt = re.search('<script>(window.*?)</script>', html).expand(r'\1')
cookie = res.headers['Set-Cookie'].split(";")[0]
tv_token = 'Bearer ' + re.search('token:"(.*?)"', nuxt).expand(r'\1')
auth = {'timestamp': timestamp, 'cookie': cookie, 'tv_token': tv_token}
with open(filepath, 'w', encoding='UTF-8') as file:
file.write(json.dumps(auth))

return nuxt, cookie, tv_token

def _auth_urlopen(self, url: str, headers: dict = None) -> tuple:
if headers is None:
headers = {}
timestamp = datetime.timestamp(datetime.today())
filepath = os.path.join(xbmcvfs.translatePath(get_addon_profile()), 'auth')

try:
with open(filepath, encoding='UTF-8') as file:
auth = json.loads(file.read())
except FileNotFoundError:
auth = {'timestamp': timestamp}

for _ in range(2):
if 'cookie' in auth:
headers['cookie'] = auth['cookie']
headers['tv_token'] = auth['tv_token']
req = Request(url, headers=headers)

try:
with urlopen(req) as res:
if res.code == 200:
return res.read(), auth['cookie'], auth['tv_token']
except HTTPError as error:
if error.code == 403:
log("Cette chaîne ne fait pas partie de votre offre.", LogLevel.INFO)
break
if error.code == 401:
log(f"Cookie/token invalide, âge = {int(timestamp - auth['timestamp'])}", LogLevel.INFO)
else:
log(f"Erreur {error}", LogLevel.INFO)
raise

_, auth['cookie'], auth['tv_token'] = self._get_auth()

return None, None, None

def _get_programs(self, period_start: int = None, period_end: int = None) -> list:
"""Returns the programs for today (default) or the specified period"""
try:
Expand Down
4 changes: 2 additions & 2 deletions resources/lib/providers/fr/orange.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ class OrangeFranceProvider(OrangeTemplate):
# pylint: disable=line-too-long
def __init__(self) -> None:
super().__init__(
endpoint_stream_info = 'https://mediation-tv.orange.fr/all/live/v3/applications/PC/users/me/channels/{channel_id}/stream?terminalModel=WEB_PC',
endpoint_streams = 'https://mediation-tv.orange.fr/all/live/v3/applications/PC/channels?mco=OFR',
endpoint_stream_info = 'https://mediation-tv.orange.fr/all/api-gw/live/v3/auth/accountToken/applications/PC/channels/{channel_id}/stream?terminalModel=WEB_PC',
endpoint_streams = 'https://rp-ott-mediation-tv.woopic.com/api-gw/live/v3/applications/PC/programs?groupBy=channel&mco=OFR',
endpoint_programs = 'https://rp-ott-mediation-tv.woopic.com/api-gw/live/v3/applications/PC/programs?period={period}&mco=OFR',
groups = {
'TNT': \
Expand Down
Loading