-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtextgpt.py
331 lines (264 loc) · 12.1 KB
/
textgpt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
from os import environ
from flask import Flask, request, redirect
from twilio.twiml.messaging_response import MessagingResponse
from twilio.rest import Client
import openai
from openai import OpenAIError
from messagedb import MessageDB, Message
from datetime import datetime
from imageprocessor import get_image_bytes_if_valid
TWILIO_PHONE_NUMBER = environ.get('TWILIO_PHONE_NUMBER')
TWILIO_AUTH_TOKEN = environ.get('TWILIO_AUTH_TOKEN')
TWILIO_ACCOUNT_SID = environ.get('TWILIO_ACCOUNT_SID')
OPENAI_API_KEY = environ.get('OPENAI_API_KEY')
DEFAULT_SETTINGS = {'model_id': 18, # gpt3.5-turbo
'system_prompt_id': 1,
'stop_sequence': None,
'max_tokens': None,
'temperature': 1.0,
'top_p': 1.0,
'frequency_penalty': 0.0,
'presence_penalty': 0.0
}
HELP_STR = """TextGPT Commands:
#help
- prints this help message
#get [<key>|'settings']
- get your current setting for <key> or all settings
#set <key> <value>
- set your current setting for <key> to <value>
Keys for #get and #set:
- model: the OpenAI model to use
- system_prompt: the prompt to use to insruct the model of what to do
- stop_sequence: the sequence of tokens to use to stop the model from generating more tokens
- max_tokens: the maximum number of tokens to generate
- temperature: the sample temperature to use when generating tokens (higher = more random)
- top_p: the nucleus sampling probability to use when generating tokens (higher = more random)
- frequency_penalty: the frequency penalty to use when generating tokens (higher = more conservative)
- presence_penalty: the presence penalty to use when generating tokens (higher = more conservative)
#reset ['all'|'messages'|'settings']
- all: resets everything
- messages: resets all messages
- settings: resets all settings
#models
- prints all available models
#limits
- prints your current usage limits
#image <'create'|'edit'|'variation'> <prompt>
- create: creates an image from the prompt
- edit: edits an image from the prompt
- variation: creates a new variation of an image
"""
class textGPT(object):
def __init__(self, db_name='test.db', number=TWILIO_PHONE_NUMBER) -> None:
self.db_name = db_name
self.client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
self.number = number
self.mdb = MessageDB(self.db_name)
self.mdb.add_phone_number(TWILIO_PHONE_NUMBER)
self.update_models()
def handle_incoming_message(self, message_values):
incoming_message_sid = message_values.get('MessageSid', None)
from_phone_number = message_values.get('From', None)
to_phone_number = message_values.get('To', None)
incoming_message_body = message_values.get('Body', None)
media_url = media_url = message_values.get('MediaUrl0', None)
self.mdb = MessageDB(self.db_name)
incoming_message_obj = self.mdb.add_message(
incoming_message_sid, from_phone_number, to_phone_number, incoming_message_body)
# If settings is None, add default settings
if (settings := self.mdb.get_settings_for_phone_number(from_phone_number)) is None:
self.mdb.add_settings(from_phone_number, **DEFAULT_SETTINGS)
settings = self.mdb.get_settings_for_phone_number(
from_phone_number)
# Handle Commands ie #get settings #set settings #get system prompt #set system prompt
if incoming_message_body.startswith('#'):
outgoing_message_body = self.handle_command(
incoming_message_obj, settings, media_url)
else:
outgoing_message_body = self.get_message_response(
incoming_message_obj, settings)
outgoing_message_objs = self.send_message(
to_phone_number=from_phone_number, body=outgoing_message_body)
return incoming_message_obj and outgoing_message_objs
def send_message(self, to_phone_number, body, chunk_size=1600):
if len(body) > chunk_size:
chunks = [body[i:i+chunk_size]
for i in range(0, len(body), chunk_size)]
else:
chunks = [body,]
self.mdb = MessageDB(self.db_name)
outgoing_message_objs = []
for chunk in chunks:
if chunk.startswith('http'):
media_url = chunk
chunk = None
else:
media_url = None
message = self.client.messages.create(
from_=self.number,
to=to_phone_number,
body=chunk,
media_url=media_url)
message_sid = message.sid
outgoing_message_obj = self.mdb.add_message(
message_sid, self.number, to_phone_number, body)
outgoing_message_objs.append(outgoing_message_obj)
return outgoing_message_objs
def get_message_response(self, incoming_message_obj, settings=None):
settings = {} if settings is None else settings
user_phone_number = incoming_message_obj.from_phone_number
text_messages = self.mdb.get_messages_for_phone_number(
user_phone_number)
openai_messages = [
{'role': 'system', 'content': settings.pop('system_prompt', None)},]
for message_obj in text_messages:
if message_obj.body.startswith('#'):
continue
elif message_obj.from_phone_number == user_phone_number:
role = 'user'
elif message_obj.from_phone_number == self.number:
role = 'assistant'
else:
print(
'Error: message_obj.from_phone_number not in [user_phone_number, self.number]')
openai_messages.append({'role': role, 'content': message_obj.body})
settings['stop'] = settings.pop('stop_sequence', None)
return self.openai_get_chat(openai_messages, **settings)
def handle_command(self, incoming_message_obj, settings=None, media_url=None):
settings = {} if settings is None else settings
verbs = ['help', 'get', 'set', 'reset', 'models', 'limits', 'image']
settings_params = ['model', 'system_prompt', 'stop_sequence',
'max_tokens', 'temperature', 'top_p', 'frequency_penalty', 'presence_penalty']
# Handle Commands ie #get settings #set settings #get system prompt #set system prompt
user_phone_number = incoming_message_obj.from_phone_number
command = incoming_message_obj.body[1:].lower()
verb, *args = command.split(maxsplit=2)
# Assume command is invalid
reply = f'Error: {command} is not a valid command.'
if verb == 'help' and len(args) == 0:
reply = HELP_STR
elif verb == 'get' and len(args) == 1:
param = args[0]
if param == 'settings':
settings_str = '\n'.join(
f'{k}:\n{v}' for k, v in settings.items())
reply = f'Your settings are:\n{settings_str}'
elif param in settings_params:
reply = f'Your current {param} is:\n{settings[param]}'
elif verb == 'set' and len(args) == 2:
param, value = args
if param in settings_params:
self.mdb.update_settings_for_phone_number(
user_phone_number, **{param: value})
reply = f'Your {param} has been set to:\n{value}'
elif verb == 'reset' and len(args) < 2:
# If no args, reset all
args = args or ['all',]
param = args[0]
if param == 'settings':
self.mdb.update_settings_for_phone_number(
user_phone_number, **DEFAULT_SETTINGS)
reply = 'Your settings have been reset to defaults'
elif param == 'messages':
self.mdb.delete_messages_for_phone_number(user_phone_number)
reply = 'Your conversation has been reset.'
elif param == 'all':
self.mdb.update_settings_for_phone_number(
user_phone_number, **DEFAULT_SETTINGS)
self.mdb.delete_messages_for_phone_number(user_phone_number)
reply = 'Your settings and conversation have been reset.'
elif verb == 'models' and len(args) == 0:
reply = 'Available Models:\n' + '\n'.join(self.all_models)
elif verb == 'limits' and len(args) == 0:
reply = self.get_user_limits(user_phone_number)
elif verb == 'image':
param = args[0]
prompt = args[1] if len(args) == 2 else ''
if param == 'create':
reply = self.openai_get_image(prompt)
elif param == 'edit':
reply = self.openai_get_image_edit(prompt, media_url=media_url)
elif param == 'variation':
reply = self.openai_get_image_variation(
prompt, media_url=media_url)
return reply
def update_models(self):
self.all_models = sorted([model['id']
for model in openai.Model.list()['data']])
for model in self.all_models:
self.mdb.add_model(model)
return self.all_models
def try_openai(self, getter_fn, parser_fn, **kwargs):
try:
response = getter_fn(**kwargs)
return parser_fn(response)
except OpenAIError as e:
print(e)
return str(e)
def openai_get_chat(self, messages=None, n=1, **kwargs):
messages = [] if messages is None else messages
result = self.try_openai(
getter_fn=openai.ChatCompletion.create,
parser_fn=lambda response: response.choices[0].message.content.strip(
),
messages=messages,
n=n, **kwargs)
return result
def parse_size_from_prompt(self, prompt, default_size='256x256'):
for size in ['256x256', '512x512', '1024x1024']:
if size in prompt:
return size, prompt.replace(size, '')
return default_size, prompt
def openai_get_image(self, prompt, n=1, **kwargs):
size, prompt = self.parse_size_from_prompt(prompt)
result = self.try_openai(
getter_fn=openai.Image.create,
parser_fn=lambda response: response.data[0].url,
size=size,
prompt=prompt,
n=n, **kwargs)
return result
def openai_get_image_edit(self, prompt, media_url, n=1, **kwargs):
size, prompt = self.parse_size_from_prompt(prompt)
image_bytes, mask_bytes = get_image_bytes_if_valid(media_url)
if not isinstance(image_bytes, bytes):
error_message = image_bytes
return error_message
result = self.try_openai(
getter_fn=openai.Image.create_edit,
parser_fn=lambda response: response.data[0].url,
size=size,
prompt=prompt,
image=image_bytes,
mask=mask_bytes,
n=n, **kwargs)
return result
def openai_get_image_variation(self, prompt, media_url, n=1, **kwargs):
size, prompt = self.parse_size_from_prompt(prompt)
# n = int(prompt.strip()) if prompt else 1
image_bytes, mask_bytes = get_image_bytes_if_valid(media_url)
if not isinstance(image_bytes, bytes):
error_message = image_bytes
return error_message
result = self.try_openai(
getter_fn=openai.Image.create_variation,
parser_fn=lambda response: response.data[0].url,
size=size,
image=image_bytes,
n=n, **kwargs)
return result
def get_user_limits(self, user_phone_number):
# TODO: implement token limiting and message limiting
raise NotImplementedError
# Flask app webhook handler for Twilio SMS
app = Flask(__name__)
@app.route("/sms", methods=['POST'])
def incoming_sms():
textgpt.handle_incoming_message(request.values)
return ''
if __name__ == "__main__":
textgpt = textGPT(TWILIO_PHONE_NUMBER)
textgpt.mdb.add_system_prompt(
"Your role is to respond to texts like Eric Cartman from South Park.")
app.run(debug=False, port=6969)