-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmonitor.py
160 lines (138 loc) · 6.08 KB
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import requests
import schedule
import time
import os
from dotenv import load_dotenv
import jwt
import datetime
# Load environment variables
load_dotenv()
APP_ID = os.getenv("APP_ID")
PRIVATE_KEY_PATH = os.getenv("PRIVATE_KEY_PATH")
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") # Fallback token
SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL")
GITHUB_API_URL = "https://api.github.com"
# Constants
REQUEST_DELAY = 2 # Delay between API requests
PER_PAGE = 100 # Results per page
SEARCH_CONTENT = '"colors" primary' # Content to search within mint.json
USE_APP_AUTH = True # Set to False to use Personal Access Token instead of GitHub App
# Notify Slack with a custom message
def notify_slack_message(message):
response = requests.post(SLACK_WEBHOOK_URL, json={"text": message})
if response.status_code == 200:
print("Notification sent successfully.")
else:
print(f"Failed to send notification: {response.status_code} - {response.text}")
# Generate JWT for GitHub App authentication
def generate_jwt():
with open(PRIVATE_KEY_PATH, "r") as pem_file:
private_key = pem_file.read()
payload = {
"iat": int(time.time()), # Issued at time
"exp": int(time.time()) + 600, # Expires after 10 minutes
"iss": APP_ID # GitHub App ID
}
return jwt.encode(payload, private_key, algorithm="RS256")
# Retrieve an installation access token
def get_installation_access_token():
jwt_token = generate_jwt()
headers = {"Authorization": f"Bearer {jwt_token}", "Accept": "application/vnd.github.v3+json"}
response = requests.get(f"{GITHUB_API_URL}/app/installations", headers=headers)
if response.status_code != 200:
raise Exception(f"Failed to fetch installations: {response.text}")
installation_id = response.json()[0]["id"]
token_url = f"{GITHUB_API_URL}/app/installations/{installation_id}/access_tokens"
response = requests.post(token_url, headers=headers)
if response.status_code != 201:
raise Exception(f"Failed to get access token: {response.text}")
return response.json().get("token")
# Search for mint.json files containing specific content
def search_mint_json_all_pages():
headers = {"Accept": "application/vnd.github.v3+json"}
if USE_APP_AUTH:
access_token = get_installation_access_token()
headers["Authorization"] = f"token {access_token}"
print("Using GitHub App installation token.")
else:
headers["Authorization"] = f"token {GITHUB_TOKEN}"
print("Using Personal Access Token.")
query = f"{SEARCH_CONTENT} in:file filename:mint.json"
results = []
seen_repos = set()
start_page = 1
while True: # Loop until all pages are processed
print(f"\n=== Processing pages {start_page} to {start_page + 9} ===")
batch_has_results = False
for page in range(start_page, start_page + 10):
search_url = f"{GITHUB_API_URL}/search/code?q={query}&sort=indexed&order=desc&page={page}&per_page={PER_PAGE}"
print(f"Fetching results from: {search_url}")
response = requests.get(search_url, headers=headers)
if response.status_code == 200:
data = response.json()
elif response.status_code == 403:
print("Rate limit hit. Retrying after waiting...")
handle_rate_limit(response)
continue
else:
print(f"Error on page {page}: {response.json()}")
return results
num_results = len(data.get("items", []))
print(f"Page {page} returned {num_results} results.")
if not data.get("items"):
print(f"No more results found on page {page}.")
break
for item in data["items"]:
repo_url = item["repository"]["url"]
repo_name = item["repository"]["full_name"]
file_url = item["html_url"]
if repo_name not in seen_repos:
seen_repos.add(repo_name)
created_at = fetch_repo_creation_date(repo_url, headers)
results.append({
"repo_name": repo_name,
"code_url": file_url,
"created_at": created_at,
"page_found": page
})
batch_has_results = True
time.sleep(REQUEST_DELAY)
if not batch_has_results:
print("No more results found. Ending search.")
break
start_page += 10
return results
# Fetch repository's creation date
def fetch_repo_creation_date(repo_url, headers):
response = requests.get(repo_url, headers=headers)
if response.status_code == 200:
return response.json().get("created_at", "Unknown")
return "Unknown"
# Handle rate limit
def handle_rate_limit(response):
reset_time = int(response.headers.get("X-RateLimit-Reset", time.time() + 60))
wait_time = reset_time - int(time.time())
print(f"Rate limit hit. Waiting {wait_time} seconds...")
time.sleep(wait_time)
# Send results to Slack
def send_results_to_slack(results):
if not results:
notify_slack_message("No mint.json files with 'colors' and 'primary' were found.")
return
message = "🎉 *Top 10 Most Recently Created Repositories Containing mint.json:*\n"
results.sort(key=lambda x: x["created_at"], reverse=True)
for i, result in enumerate(results[:10], start=1):
message += f"• {i}. *Repo*: {result['repo_name']} | <{result['code_url']}|View File> | Created: {result['created_at']} | Page: {result['page_found']}\n"
notify_slack_message(message)
# Main function
def search_and_notify():
notify_slack_message("🚀 Searching for mint.json has started!")
results = search_mint_json_all_pages()
send_results_to_slack(results)
# Run script immediately and schedule
search_and_notify()
schedule.every(12).hours.do(search_and_notify) # Runs every 12 hours
print("Monitoring GitHub for mint.json updates and sending notifications to Slack every 12 hours...")
while True:
schedule.run_pending()
time.sleep(1)