Skip to content

Commit

Permalink
python sdk 5.0.2 (#900)
Browse files Browse the repository at this point in the history
1、Fix the bug related to message ID length
2、Fix the bug which sync the subscription relationship without expression
3、The subscribe method adds raise exception
4、Set default values for AK and SK with empty strings
  • Loading branch information
zhouli11 authored Dec 31, 2024
1 parent 7d55793 commit 4362aef
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 83 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/python_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
jobs:
flake8:
name: flake8
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v3
Expand All @@ -17,7 +17,7 @@ jobs:
run: |
flake8 --ignore=E501,W503 --exclude python/rocketmq/grpc_protocol python
isort:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v3
Expand All @@ -30,7 +30,7 @@ jobs:
run: |
isort --check --diff --skip python/rocketmq/grpc_protocol python
black:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down
33 changes: 21 additions & 12 deletions python/example/async_producer_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,29 @@ def handle_send_result(result_future):


if __name__ == '__main__':
endpoints = "endpoints"
credentials = Credentials("ak", "sk")
endpoints = "foobar.com:8080"
credentials = Credentials()
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
topic = "topic"
producer = Producer(config, (topic,))

try:
producer = Producer(config, (topic,))
producer.startup()
msg = Message()
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
msg.tag = "rocketmq-send-message"
msg.keys = "send_async"
msg.add_property("send", "async")
send_result_future = producer.send_async(msg)
send_result_future.add_done_callback(handle_send_result)
try:
for i in range(10):
msg = Message()
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
msg.tag = "rocketmq-send-message"
msg.keys = "send_async"
msg.add_property("send", "async")
send_result_future = producer.send_async(msg)
send_result_future.add_done_callback(handle_send_result)
except Exception as e:
print(f"async producer{producer.__str__()} send message raise exception: {e}")
producer.shutdown()
except Exception as e:
print(f"async producer example raise exception: {e}")
print(f"{producer.__str__()} startup raise exception: {e}")
producer.shutdown()
37 changes: 25 additions & 12 deletions python/example/async_simple_consumer_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,40 @@

def receive_callback(receive_result_future, consumer):
messages = receive_result_future.result()
print(f"{consumer.__str__()} receive {len(messages)} messages.")
for msg in messages:
print(f"{consumer.__str__()} receive {len(messages)} messages in callback.")
try:
consumer.ack(msg)
print(f"receive and ack message:{msg.message_id} in callback.")
print(f"ack message:{msg.message_id}.")
except Exception as exception:
print(f"receive message callback raise exception: {exception}")
print(f"receive message raise exception: {exception}")


if __name__ == '__main__':
endpoints = "endpoints"
credentials = Credentials("ak", "sk")
endpoints = "foobar.com:8080"
credentials = Credentials()
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
topic = "topic"

simple_consumer = SimpleConsumer(config, "consumer-group")
try:
simple_consumer = SimpleConsumer(config, "consumer_group")
simple_consumer.startup()
simple_consumer.subscribe(topic)
while True:
time.sleep(5)
future = simple_consumer.receive_async(32, 15)
future.add_done_callback(functools.partial(receive_callback, consumer=simple_consumer))
try:
simple_consumer.subscribe(topic)
# use tag filter
# simple_consumer.subscribe(topic, FilterExpression("tag"))
while True:
try:
time.sleep(1)
future = simple_consumer.receive_async(32, 15)
future.add_done_callback(functools.partial(receive_callback, consumer=simple_consumer))
except Exception as e:
print(f"{simple_consumer.__str__()} receive topic:{topic} raise exception: {e}")
except Exception as e:
print(f"{simple_consumer.__str__()} subscribe topic:{topic} raise exception: {e}")
simple_consumer.shutdown()
except Exception as e:
print(f"simple consumer example raise exception: {e}")
print(f"{simple_consumer.__str__()} startup raise exception: {e}")
simple_consumer.shutdown()
36 changes: 22 additions & 14 deletions python/example/normal_producer_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,30 @@
from rocketmq import ClientConfiguration, Credentials, Message, Producer

if __name__ == '__main__':
endpoints = "endpoints"
credentials = Credentials("ak", "sk")
endpoints = "foobar.com:8080"
credentials = Credentials()
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
topic = "topic"
producer = Producer(config, (topic,))

try:
producer = Producer(config, (topic,))
producer.startup()
msg = Message()
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
msg.tag = "rocketmq-send-message"
msg.keys = "send_sync"
msg.add_property("send", "sync")
res = producer.send(msg)
print(f"{producer.__str__()} send message success. {res}")
producer.shutdown()
print(f"{producer.__str__()} shutdown.")
try:
msg = Message()
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
msg.tag = "rocketmq-send-message"
msg.keys = "send_sync"
msg.add_property("send", "sync")
res = producer.send(msg)
print(f"{producer.__str__()} send message success. {res}")
producer.shutdown()
print(f"{producer.__str__()} shutdown.")
except Exception as e:
print(f"normal producer example raise exception: {e}")
producer.shutdown()
except Exception as e:
print(f"normal producer example raise exception: {e}")
print(f"{producer.__str__()} startup raise exception: {e}")
producer.shutdown()
39 changes: 24 additions & 15 deletions python/example/simple_consumer_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,32 @@
from rocketmq import ClientConfiguration, Credentials, SimpleConsumer

if __name__ == '__main__':
endpoints = "endpoints"
credentials = Credentials("ak", "sk")
endpoints = "foobar.com:8080"
credentials = Credentials()
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
topic = "topic"
simple_consumer = SimpleConsumer(config, "consumer-group")
try:
simple_consumer = SimpleConsumer(config, "consumer-group")
simple_consumer.startup()
simple_consumer.subscribe(topic)
while True:
try:
messages = simple_consumer.receive(32, 15)
if messages is not None:
print(f"{simple_consumer.__str__()} receive {len(messages)} messages.")
for msg in messages:
simple_consumer.ack(msg)
print(f"{simple_consumer.__str__()} ack message:[{msg.message_id}].")
except Exception as e:
print(f"receive or ack message raise exception: {e}")
try:
simple_consumer.subscribe(topic)
# use tag filter
# simple_consumer.subscribe(topic, FilterExpression("tag"))
while True:
try:
messages = simple_consumer.receive(32, 15)
if messages is not None:
print(f"{simple_consumer.__str__()} receive {len(messages)} messages.")
for msg in messages:
simple_consumer.ack(msg)
print(f"{simple_consumer.__str__()} ack message:[{msg.message_id}].")
except Exception as e:
print(f"receive or ack message raise exception: {e}")
except Exception as e:
print(f"{simple_consumer.__str__()} subscribe topic:{topic} raise exception: {e}")
simple_consumer.shutdown()
except Exception as e:
print(f"simple consumer example raise exception: {e}")
print(f"{simple_consumer.__str__()} startup raise exception: {e}")
simple_consumer.shutdown()
17 changes: 12 additions & 5 deletions python/example/transaction_producer_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@ def check(self, message: Message) -> TransactionResolution:


if __name__ == '__main__':
endpoints = "endpoints"
credentials = Credentials("ak", "sk")
endpoints = "foobar.com:8080"
credentials = Credentials()
# if auth enable
# credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
topic = "topic"
producer = Producer(config, (topic,))

try:
producer = Producer(config, (topic,), TestChecker())
producer.startup()
except Exception as e:
print(f"{producer.__str__()} startup raise exception: {e}")

try:
transaction = producer.begin_transaction()
msg = Message()
msg.topic = topic
Expand All @@ -40,6 +47,6 @@ def check(self, message: Message) -> TransactionResolution:
msg.keys = "send_transaction"
msg.add_property("send", "transaction")
res = producer.send(msg, transaction)
print(f"{producer.__str__()} send half message. {res}")
print(f"transaction producer{producer.__str__()} send half message success. {res}")
except Exception as e:
print(f"transaction producer example raise exception: {e}")
print(f"transaction producer{producer.__str__()} example raise exception: {e}")
33 changes: 23 additions & 10 deletions python/rocketmq/v5/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

class Client:

CALLBACK_THREADS_NUM = 5

def __init__(self, client_configuration, topics, client_type: ClientType, tls_enable=False):
if client_configuration is None:
raise IllegalArgumentException("clientConfiguration should not be null.")
Expand All @@ -57,7 +59,7 @@ def __init__(self, client_configuration, topics, client_type: ClientType, tls_en
else:
self.__topics = set()
self.__callback_result_queue = Queue()
self.__callback_result_thread = None
self.__callback_threads = []
self.__is_running = False
self.__client_thread_task_enabled = False
self.__had_shutdown = False
Expand All @@ -76,7 +78,7 @@ def startup(self):
logger.warn(
f"update topic exception when client startup, ignore it, try it again in scheduler. exception: {e}")
self.__start_scheduler()
self.__start_callback_handler()
self.__start_async_rpc_callback_handler()
self.__is_running = True
self._start_success()
except Exception as e:
Expand Down Expand Up @@ -240,12 +242,19 @@ def __schedule_clear_idle_rpc_channels(self):

""" callback handler for async method """

def __start_callback_handler(self):
def __start_async_rpc_callback_handler(self):
# a thread to handle callback when using async method such as send_async(), receive_async().
# this handler switches user's callback thread from RpcClient's _io_loop_thread to client's callback_handler_thread
self.__callback_result_thread = threading.Thread(name="callback_handler_thread", target=self.__handle_callback)
self.__callback_result_thread.daemon = True
self.__callback_result_thread.start()
try:
for i in range(Client.CALLBACK_THREADS_NUM):
th = threading.Thread(name=f"callback_handler_thread-{i}", target=self.__handle_callback)
th.daemon = True
self.__callback_threads.append(th)
th.start()
logger.info(f"{self.__str__()} start async rpc callback thread:{th} success.")
except Exception as e:
print(f"{self.__str__()} start async rpc callback raise exception: {e}")
raise e

def __handle_callback(self):
while True:
Expand All @@ -263,7 +272,7 @@ def __handle_callback(self):
self.__callback_result_queue.task_done()
else:
break
logger.info(f"{self.__str__()} stop client callback result handler thread success.")
logger.info(f"{self.__str__()} stop client callback result handler thread:{threading.current_thread()} success.")

""" protect """

Expand Down Expand Up @@ -375,6 +384,7 @@ def __setting_write(self, endpoints):
req = self._sync_setting_req(endpoints)
callback = functools.partial(self.__setting_write_callback, endpoints=endpoints)
future = self.__rpc_client.telemetry_write_async(endpoints, req)
logger.debug(f"{self.__str__()} send setting to {endpoints.__str__()}, {req}")
future.add_done_callback(callback)

def __retrieve_telemetry_stream_stream_call(self, endpoints, rebuild=False):
Expand Down Expand Up @@ -466,9 +476,11 @@ def __stop_client_threads(self):
self.__clear_idle_rpc_channels_threading_event.set()
self.__clear_idle_rpc_channels_scheduler.join()

if self.__callback_result_thread is not None:
for i in range(Client.CALLBACK_THREADS_NUM):
self._set_future_callback_result(CallbackResult.end_callback_thread_result())
self.__callback_result_thread.join()

for i in range(Client.CALLBACK_THREADS_NUM):
self.__callback_threads[i].join()

self.__topic_route_scheduler = None
self.__topic_route_scheduler_threading_event = None
Expand All @@ -478,7 +490,8 @@ def __stop_client_threads(self):
self.__sync_setting_scheduler_threading_event = None
self.__clear_idle_rpc_channels_scheduler = None
self.__clear_idle_rpc_channels_threading_event = None
self.__callback_result_thread = None
self.__callback_result_queue = None
self.__callback_threads = None

""" property """

Expand Down
6 changes: 3 additions & 3 deletions python/rocketmq/v5/client/client_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@

class Credentials:

def __init__(self, ak, sk):
self.__ak = ak
self.__sk = sk
def __init__(self, ak="", sk=""):
self.__ak = ak if ak is not None else ""
self.__sk = sk if sk is not None else ""

@property
def ak(self):
Expand Down
4 changes: 3 additions & 1 deletion python/rocketmq/v5/consumer/simple_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def subscribe(self, topic, filter_expression: FilterExpression = None):
self.__subscriptions.put(topic, filter_expression if filter_expression is not None else FilterExpression())
except Exception as e:
logger.error(f"subscribe exception: {e}")
raise e

def unsubscribe(self, topic):
if self.is_running is False:
Expand Down Expand Up @@ -141,14 +142,15 @@ def _sync_setting_req(self, endpoints):
sub_entry.topic.name = topic
sub_entry.topic.resource_namespace = self.client_configuration.namespace
sub_entry.expression.type = expression.filter_type
sub_entry.expression.expression = expression.expression

settings = Settings()
settings.client_type = self.client_type
settings.access_point.CopyFrom(endpoints.endpoints)
settings.request_timeout.seconds = self.client_configuration.request_timeout
settings.subscription.CopyFrom(subscription)
settings.user_agent.language = 6
settings.user_agent.version = "5.0.1.1"
settings.user_agent.version = Misc.sdk_version()
settings.user_agent.platform = Misc.get_os_description()
settings.user_agent.hostname = Misc.get_local_ip()
settings.metric.on = False
Expand Down
2 changes: 2 additions & 0 deletions python/rocketmq/v5/model/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ def body(self, body):

@topic.setter
def topic(self, topic):
if topic is None or topic.strip() == '':
raise IllegalArgumentException("topic has not been set yet")
if Misc.is_valid_topic(topic):
self.__topic = topic
else:
Expand Down
5 changes: 1 addition & 4 deletions python/rocketmq/v5/util/message_id_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ def next_message_id(self):

@staticmethod
def decode(message_id):
if len(message_id) == MessageIdCodec.MESSAGE_ID_LENGTH_FOR_V1_OR_LATER:
return message_id[2:]
else:
return message_id
return message_id

""" private """

Expand Down
Loading

0 comments on commit 4362aef

Please sign in to comment.