diff --git a/salt/minion.py b/salt/minion.py index 2c2585637420..034b3b032cf2 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -17,6 +17,7 @@ import time import traceback import types +import uuid import salt import salt.beacons @@ -1071,8 +1072,10 @@ def _bind(self): @salt.ext.tornado.gen.coroutine def handle_event(self, package): - for minion in self.minions: - minion.handle_event(package) + try: + yield [_.handle_event(package) for _ in self.minions] + except Exception as exc: # pylint: disable=broad-except + log.error("Error dispatching event. %s", exc) def _create_minion_object( self, @@ -1397,13 +1400,8 @@ def connect_master(self, failed=False): self.req_channel = salt.channel.client.AsyncReqChannel.factory( self.opts, io_loop=self.io_loop ) - - if hasattr( - self.req_channel, "connect" - ): # TODO: consider generalizing this for all channels - log.debug("Connecting minion's long-running req channel") - yield self.req_channel.connect() - + log.debug("Connecting minion's long-running req channel") + yield self.req_channel.connect() yield self._post_master_init(master) @salt.ext.tornado.gen.coroutine @@ -1626,6 +1624,7 @@ def _load_modules( return functions, returners, errors, executors def _send_req_sync(self, load, timeout): + # XXX: Signing should happen in RequestChannel to be fixed in 3008 if self.opts["minion_sign_messages"]: log.trace("Signing event to be published onto the bus.") minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem") @@ -1633,18 +1632,25 @@ def _send_req_sync(self, load, timeout): minion_privkey_path, salt.serializers.msgpack.serialize(load) ) load["sig"] = sig - - with salt.utils.event.get_event( - "minion", opts=self.opts, listen=False - ) as event: - return event.fire_event( + with salt.utils.event.get_event("minion", opts=self.opts, listen=True) as event: + request_id = str(uuid.uuid4()) + log.trace("Send request to main id=%s", request_id) + event.fire_event( load, - f"__master_req_channel_payload/{self.opts['master']}", + f"__master_req_channel_payload/{request_id}/{self.opts['master']}", timeout=timeout, ) + ret = event.get_event( + tag=f"__master_req_channel_return/{request_id}", + wait=timeout, + ) + log.trace("Reply from main %s", request_id) + return ret["ret"] @salt.ext.tornado.gen.coroutine def _send_req_async(self, load, timeout): + # XXX: Signing should happen in RequestChannel to be fixed in 3008 + # XXX: This is only used by syndic if self.opts["minion_sign_messages"]: log.trace("Signing event to be published onto the bus.") minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem") @@ -1652,31 +1658,49 @@ def _send_req_async(self, load, timeout): minion_privkey_path, salt.serializers.msgpack.serialize(load) ) load["sig"] = sig - - with salt.utils.event.get_event( - "minion", opts=self.opts, listen=False - ) as event: - ret = yield event.fire_event_async( + with salt.utils.event.get_event("minion", opts=self.opts, listen=True) as event: + request_id = str(uuid.uuid4()) + log.trace("Send request to main id=%s", request_id) + yield event.fire_event_async( load, - f"__master_req_channel_payload/{self.opts['master']}", + f"__master_req_channel_payload/{request_id}/{self.opts['master']}", timeout=timeout, ) - raise salt.ext.tornado.gen.Return(ret) + start = time.time() + while time.time() - start < timeout: + ret = event.get_event( + tag=f"__master_req_channel_return/{request_id}", no_block=True + ) + if ret: + break + yield salt.ext.tornado.gen.sleep(0.3) + else: + raise TimeoutError("Did not recieve return event") + log.trace("Reply from main %s", request_id) + raise salt.ext.tornado.gen.Return(ret["ret"]) - def _fire_master( - self, - data=None, - tag=None, - events=None, - pretag=None, - timeout=60, - sync=True, - timeout_handler=None, - include_startup_grains=False, - ): + @salt.ext.tornado.gen.coroutine + def _send_req_async_main(self, load, timeout): """ - Fire an event on the master, or drop message if unable to send. + Send a request to the master's request server. To be called from the + top level process in the main thread only. Worker threads and + processess should call _send_req_sync or _send_req_async as nessecery. """ + if self.opts["minion_sign_messages"]: + log.trace("Signing event to be published onto the bus.") + minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem") + sig = salt.crypt.sign_message( + minion_privkey_path, salt.serializers.msgpack.serialize(load) + ) + load["sig"] = sig + ret = yield self.req_channel.send( + load, timeout=timeout, tries=self.opts["return_retry_tries"] + ) + raise salt.ext.tornado.gen.Return(ret) + + def _fire_master_prepare( + self, data, tag, events, pretag, include_startup_grains=False + ): load = { "id": self.opts["id"], "cmd": "_minion_event", @@ -1701,35 +1725,62 @@ def _fire_master( if k in self.opts["start_event_grains"] } load["grains"] = grains_to_add + return load - if sync: - try: - self._send_req_sync(load, timeout) - except salt.exceptions.SaltReqTimeoutError: + @salt.ext.tornado.gen.coroutine + def _fire_master_main( + self, + data=None, + tag=None, + events=None, + pretag=None, + timeout=60, + timeout_handler=None, + include_startup_grains=False, + ): + load = self._fire_master_prepare( + data, tag, events, pretag, include_startup_grains + ) + if timeout_handler is None: + + def handle_timeout(*_): log.info( - "fire_master failed: master could not be contacted. Request timed" - " out." + "fire_master failed: master could not be contacted. Request" + " timed out." ) - return False - except Exception: # pylint: disable=broad-except - log.info("fire_master failed: %s", traceback.format_exc()) - return False - else: - if timeout_handler is None: + return True - def handle_timeout(*_): - log.info( - "fire_master failed: master could not be contacted. Request" - " timed out." - ) - return True + timeout_handler = handle_timeout - timeout_handler = handle_timeout + yield self._send_req_async_main(load, timeout) - with salt.ext.tornado.stack_context.ExceptionStackContext(timeout_handler): - # pylint: disable=unexpected-keyword-arg - self._send_req_async(load, timeout, callback=lambda f: None) - # pylint: enable=unexpected-keyword-arg + def _fire_master( + self, + data=None, + tag=None, + events=None, + pretag=None, + timeout=60, + timeout_handler=None, + include_startup_grains=False, + ): + """ + Fire an event on the master, or drop message if unable to send. + """ + load = self._fire_master_prepare( + data, tag, events, pretag, include_startup_grains + ) + try: + self._send_req_sync(load, timeout) + except salt.exceptions.SaltReqTimeoutError: + log.info( + "fire_master failed: master could not be contacted. Request timed" + " out." + ) + return False + except Exception: # pylint: disable=broad-except + log.info("fire_master failed: %s", traceback.format_exc()) + return False return True @salt.ext.tornado.gen.coroutine @@ -2228,10 +2279,7 @@ def _thread_multi_return(cls, minion_instance, opts, data): except Exception as exc: # pylint: disable=broad-except log.error("The return failed for job %s: %s", data["jid"], exc) - def _return_pub(self, ret, ret_cmd="_return", timeout=60, sync=True): - """ - Return the data from the executed command to the master server - """ + def _prepare_return_pub(self, ret, ret_cmd="_return"): jid = ret.get("jid", ret.get("__jid__")) fun = ret.get("fun", ret.get("__fun__")) if self.opts["multiprocessing"]: @@ -2285,7 +2333,12 @@ def _return_pub(self, ret, ret_cmd="_return", timeout=60, sync=True): if ret["jid"] == "req": ret["jid"] = salt.utils.jid.gen_jid(self.opts) salt.utils.minion.cache_jobs(self.opts, ret["jid"], ret) + return load + @salt.ext.tornado.gen.coroutine + def _return_pub_main(self, ret, ret_cmd="_return", timeout=60): + jid = ret.get("jid", ret.get("__jid__")) + load = self._prepare_return_pub(ret, ret_cmd) if not self.opts["pub_ret"]: return "" @@ -2299,20 +2352,38 @@ def timeout_handler(*_): ) return True - if sync: - try: - ret_val = self._send_req_sync(load, timeout=timeout) - except SaltReqTimeoutError: - timeout_handler() - return "" - else: - with salt.ext.tornado.stack_context.ExceptionStackContext(timeout_handler): - # pylint: disable=unexpected-keyword-arg - ret_val = self._send_req_async( - load, timeout=timeout, callback=lambda f: None - ) - # pylint: enable=unexpected-keyword-arg + try: + ret_val = yield self._send_req_async_main(load, timeout=timeout) + except SaltReqTimeoutError: + timeout_handler() + ret_val = "" + log.trace("ret_val = %s", ret_val) # pylint: disable=no-member + raise salt.ext.tornado.gen.Return(ret_val) + def _return_pub(self, ret, ret_cmd="_return", timeout=60): + """ + Return the data from the executed command to the master server + """ + jid = ret.get("jid", ret.get("__jid__")) + load = self._prepare_return_pub(ret, ret_cmd) + if not self.opts["pub_ret"]: + return "" + + def timeout_handler(*_): + log.warning( + "The minion failed to return the job information for job %s. " + "This is often due to the master being shut down or " + "overloaded. If the master is running, consider increasing " + "the worker_threads value.", + jid, + ) + return True + + try: + ret_val = self._send_req_sync(load, timeout=timeout) + except SaltReqTimeoutError: + timeout_handler() + return "" log.trace("ret_val = %s", ret_val) # pylint: disable=no-member return ret_val @@ -2320,6 +2391,9 @@ def _return_pub_multi(self, rets, ret_cmd="_return", timeout=60, sync=True): """ Return the data from the executed command to the master server """ + # XXX: This is only used by syndic and should be moved to the Syndic class. + # XXX: The sync flag is only called with sync=False. Which also means + # deprecating sync means we can remove Minion._send_req_async. if not isinstance(rets, list): rets = [rets] jids = {} @@ -2460,13 +2534,13 @@ def _fire_master_minion_start(self): # Send an event to the master that the minion is live if self.opts["enable_legacy_startup_events"]: # Old style event. Defaults to False in 3001 release. - self._fire_master( + self._fire_master_main( "Minion {} started at {}".format(self.opts["id"], time.asctime()), "minion_start", include_startup_grains=include_grains, ) # send name spaced event - self._fire_master( + self._fire_master_main( "Minion {} started at {}".format(self.opts["id"], time.asctime()), tagify([self.opts["id"], "start"], "minion"), include_startup_grains=include_grains, @@ -2749,21 +2823,35 @@ def handle_event(self, package): notify=data.get("notify", False), ) elif tag.startswith("__master_req_channel_payload"): - job_master = tag.rsplit("/", 1)[1] + request_id, job_master = tag.rsplit("/", 2)[1:] if job_master == self.opts["master"]: + ret = None try: - yield _minion.req_channel.send( + ret = yield _minion.req_channel.send( data, timeout=_minion._return_retry_timer(), tries=_minion.opts["return_retry_tries"], ) except salt.exceptions.SaltReqTimeoutError: - log.error("Timeout encountered while sending %r request", data) + log.error( + "Timeout encountered while sending %r request. id=%s", + data, + request_id, + ) + raise salt.ext.tornado.gen.Return() + with salt.utils.event.get_event( + "minion", opts=self.opts, listen=False + ) as event: + yield event.fire_event_async( + {"ret": ret}, + f"__master_req_channel_return/{request_id}", + ) else: log.debug( - "Skipping req for other master: cmd=%s master=%s", + "Skipping req for other master: cmd=%s master=%s id=%s", data["cmd"], job_master, + request_id, ) elif tag.startswith("pillar_refresh"): yield _minion.pillar_refresh( @@ -2791,13 +2879,22 @@ def handle_event(self, package): self._mine_send(tag, data) elif tag.startswith("fire_master"): if self.connected: - log.debug("Forwarding master event tag=%s", data["tag"]) - self._fire_master( + log.debug( + "Forwarding event %s to master %s", + data["tag"], + self.opts["master"], + ) + yield self._fire_master_main( data["data"], data["tag"], data["events"], data["pretag"], - sync=False, + ) + else: + log.debug( + "Master %s is not connected, dropping event %s", + self.opts["master"], + data["tag"], ) elif tag.startswith(master_event(type="disconnected")) or tag.startswith( master_event(type="failback") @@ -2865,6 +2962,7 @@ def handle_event(self, package): self.req_channel = salt.channel.client.AsyncReqChannel.factory( self.opts, io_loop=self.io_loop ) + yield self.req_channel.connect() # put the current schedule into the new loaders self.opts["schedule"] = self.schedule.option("schedule") @@ -2954,11 +3052,11 @@ def handle_event(self, package): 1 ], ) - self._return_pub(data, ret_cmd="_return", sync=False) + yield self._return_pub_main(data, ret_cmd="_return") elif tag.startswith("_salt_error"): if self.connected: log.debug("Forwarding salt error event tag=%s", tag) - self._fire_master(data, tag, sync=False) + yield self._fire_master_main(data, tag) elif tag.startswith("salt/auth/creds"): key = tuple(data["key"]) log.debug( @@ -2971,7 +3069,7 @@ def handle_event(self, package): elif tag.startswith("__beacons_return"): if self.connected: log.debug("Firing beacons to master") - self._fire_master(events=data["beacons"]) + yield self._fire_master_main(events=data["beacons"]) def cleanup_subprocesses(self): """ @@ -3169,10 +3267,9 @@ def ping_timeout_handler(*_): "minion is running under an init system." ) - self._fire_master( + self._fire_master_main( "ping", "minion_ping", - sync=False, timeout_handler=ping_timeout_handler, ) except Exception: # pylint: disable=broad-except @@ -3373,12 +3470,10 @@ def fire_master_syndic_start(self): self._fire_master( "Syndic {} started at {}".format(self.opts["id"], time.asctime()), "syndic_start", - sync=False, ) self._fire_master( "Syndic {} started at {}".format(self.opts["id"], time.asctime()), tagify([self.opts["id"], "start"], "syndic"), - sync=False, ) # TODO: clean up docs @@ -3769,7 +3864,7 @@ def _forward_events(self): "events": events, "pretag": tagify(self.opts["id"], base="syndic"), "timeout": self._return_retry_timer(), - "sync": False, + "sync": True, # Sync needs to be true unless being called from a coroutine }, ) if self.delayed: diff --git a/salt/utils/asynchronous.py b/salt/utils/asynchronous.py index f0048ff19102..016fe1748d00 100644 --- a/salt/utils/asynchronous.py +++ b/salt/utils/asynchronous.py @@ -50,7 +50,7 @@ def __init__( close_methods=None, loop_kwarg=None, ): - self.io_loop = salt.ext.tornado.ioloop.IOLoop() + self.io_loop = salt.ext.tornado.ioloop.IOLoop(make_current=False) if args is None: args = [] if kwargs is None: @@ -63,7 +63,8 @@ def __init__( self.cls = cls if loop_kwarg: kwargs[self.loop_kwarg] = self.io_loop - self.obj = cls(*args, **kwargs) + with current_ioloop(self.io_loop): + self.obj = cls(*args, **kwargs) self._async_methods = list( set(async_methods + getattr(self.obj, "async_methods", [])) ) diff --git a/salt/utils/event.py b/salt/utils/event.py index 983402875d66..b5b0b06ca40b 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -75,6 +75,7 @@ import salt.utils.process import salt.utils.stringutils import salt.utils.zeromq +from salt.exceptions import SaltInvocationError log = logging.getLogger(__name__) @@ -567,6 +568,9 @@ def _get_event(self, wait, tag, match_func=None, no_block=False): try: if not self.cpub and not self.connect_pub(timeout=wait): break + if not self._run_io_loop_sync: + log.error("Trying to get event with async subscriber") + raise SaltInvocationError("get_event needs synchornous subscriber") raw = self.subscriber.read(timeout=wait) if raw is None: break diff --git a/tests/pytests/integration/minion/test_schedule_large_event.py b/tests/pytests/integration/minion/test_schedule_large_event.py new file mode 100644 index 000000000000..3162cbfe5d6b --- /dev/null +++ b/tests/pytests/integration/minion/test_schedule_large_event.py @@ -0,0 +1,107 @@ +import sys + +import pytest + +import salt.utils.event +import salt.utils.platform +import tests.support.helpers +from tests.conftest import FIPS_TESTRUN + + +@pytest.fixture +def salt_master_1(request, salt_factories): + config_defaults = { + "open_mode": True, + "transport": request.config.getoption("--transport"), + } + config_overrides = { + "interface": "127.0.0.1", + "fips_mode": FIPS_TESTRUN, + "publish_signing_algorithm": ( + "PKCS1v15-SHA224" if FIPS_TESTRUN else "PKCS1v15-SHA1" + ), + } + + factory = salt_factories.salt_master_daemon( + "master-1", + defaults=config_defaults, + overrides=config_overrides, + extra_cli_arguments_after_first_start_failure=["--log-level=info"], + ) + with factory.started(start_timeout=120): + yield factory + + +@pytest.fixture +def salt_minion_1(salt_master_1): + config_defaults = { + "transport": salt_master_1.config["transport"], + } + master_1_port = salt_master_1.config["ret_port"] + master_1_addr = salt_master_1.config["interface"] + config_overrides = { + "master": [ + f"{master_1_addr}:{master_1_port}", + ], + "test.foo": "baz", + "fips_mode": FIPS_TESTRUN, + "encryption_algorithm": "OAEP-SHA224" if FIPS_TESTRUN else "OAEP-SHA1", + "signing_algorithm": "PKCS1v15-SHA224" if FIPS_TESTRUN else "PKCS1v15-SHA1", + } + factory = salt_master_1.salt_minion_daemon( + "minion-1", + defaults=config_defaults, + overrides=config_overrides, + extra_cli_arguments_after_first_start_failure=["--log-level=info"], + ) + with factory.started(start_timeout=120): + yield factory + + +@pytest.fixture +def script(salt_minion_1, tmp_path): + path = tmp_path / "script.py" + content = f""" + import salt.config + import salt.utils.event + + opts = salt.config.minion_config('{salt_minion_1.config_file}') + + payload = b'0' * 1048576000 + + big_event = dict() + for i in range(10000): + big_event[i] = payload = b'0' * 100 + + with salt.utils.event.get_event("minion", opts=opts) as event: + event.fire_master(big_event, 'bigevent') + + """ + path.write_text(tests.support.helpers.dedent(content)) + return path + + +# @pytest.mark.timeout_unless_on_windows(360) +def test_schedule_large_event(salt_master_1, salt_minion_1, script): + cli = salt_master_1.salt_cli(timeout=120) + ret = cli.run( + "schedule.add", + name="myjob", + function="cmd.run", + seconds=5, + job_args=f'["{sys.executable} {script}"]', + minion_tgt=salt_minion_1.id, + ) + assert "result" in ret.data + assert ret.data["result"] + with salt.utils.event.get_event( + "master", + salt_master_1.config["sock_dir"], + salt_master_1.config["transport"], + salt_master_1.config, + listen=True, + ) as event: + event = event.get_event(tag="bigevent", wait=15) + assert event + assert "data" in event + assert len(event["data"]) == 10000 diff --git a/tests/pytests/scenarios/multimaster/conftest.py b/tests/pytests/scenarios/multimaster/conftest.py index 84e7a9a3ceb1..481a4a433ef5 100644 --- a/tests/pytests/scenarios/multimaster/conftest.py +++ b/tests/pytests/scenarios/multimaster/conftest.py @@ -25,6 +25,12 @@ def salt_mm_master_1(request, salt_factories): "publish_signing_algorithm": ( "PKCS1v15-SHA224" if FIPS_TESTRUN else "PKCS1v15-SHA1" ), + "log_granular_levels": { + "salt": "info", + "salt.transport": "debug", + "salt.channel": "debug", + "salt.utils.event": "debug", + }, } factory = salt_factories.salt_master_daemon( "mm-master-1", @@ -56,6 +62,12 @@ def salt_mm_master_2(salt_factories, salt_mm_master_1): "publish_signing_algorithm": ( "PKCS1v15-SHA224" if FIPS_TESTRUN else "PKCS1v15-SHA1" ), + "log_granular_levels": { + "salt": "info", + "salt.transport": "debug", + "salt.channel": "debug", + "salt.utils.event": "debug", + }, } # Use the same ports for both masters, they are binding to different interfaces @@ -106,6 +118,13 @@ def salt_mm_minion_1(salt_mm_master_1, salt_mm_master_2): "fips_mode": FIPS_TESTRUN, "encryption_algorithm": "OAEP-SHA224" if FIPS_TESTRUN else "OAEP-SHA1", "signing_algorithm": "PKCS1v15-SHA224" if FIPS_TESTRUN else "PKCS1v15-SHA1", + "log_granular_levels": { + "salt": "info", + "salt.minion": "debug", + "salt.transport": "debug", + "salt.channel": "debug", + "salt.utils.event": "debug", + }, } factory = salt_mm_master_1.salt_minion_daemon( "mm-minion-1", @@ -136,6 +155,13 @@ def salt_mm_minion_2(salt_mm_master_1, salt_mm_master_2): "fips_mode": FIPS_TESTRUN, "encryption_algorithm": "OAEP-SHA224" if FIPS_TESTRUN else "OAEP-SHA1", "signing_algorithm": "PKCS1v15-SHA224" if FIPS_TESTRUN else "PKCS1v15-SHA1", + "log_granular_levels": { + "salt": "info", + "salt.minion": "debug", + "salt.transport": "debug", + "salt.channel": "debug", + "salt.utils.event": "debug", + }, } factory = salt_mm_master_2.salt_minion_daemon( "mm-minion-2", diff --git a/tests/pytests/unit/test_minion.py b/tests/pytests/unit/test_minion.py index 3d43ed9d8083..0740904d94a3 100644 --- a/tests/pytests/unit/test_minion.py +++ b/tests/pytests/unit/test_minion.py @@ -1,6 +1,7 @@ import copy import logging import os +import uuid import pytest @@ -94,12 +95,15 @@ def test_minion_load_grains_default(minion_opts): ], ) def test_send_req_fires_completion_event(event, minion_opts): + req_id = uuid.uuid4() event_enter = MagicMock() event_enter.send.side_effect = event[1] event = MagicMock() event.__enter__.return_value = event_enter - with patch("salt.utils.event.get_event", return_value=event): + with patch("salt.utils.event.get_event", return_value=event), patch( + "uuid.uuid4", return_value=req_id + ): minion_opts["random_startup_delay"] = 0 minion_opts["return_retry_tries"] = 30 minion_opts["grains"] = {} @@ -123,7 +127,7 @@ def test_send_req_fires_completion_event(event, minion_opts): condition_event_tag = ( len(call.args) > 1 and call.args[1] - == f"__master_req_channel_payload/{minion_opts['master']}" + == f"__master_req_channel_payload/{req_id}/{minion_opts['master']}" ) condition_event_tag_error = "{} != {}; Call(number={}): {}".format( idx, call, call.args[1], "__master_req_channel_payload" @@ -158,11 +162,11 @@ async def test_send_req_async_regression_62453(minion_opts): minion = salt.minion.Minion(minion_opts) load = {"load": "value"} - timeout = 60 + timeout = 1 # We are just validating no exception is raised - rtn = await minion._send_req_async(load, timeout) - assert rtn is False + with pytest.raises(TimeoutError): + rtn = await minion._send_req_async(load, timeout) def test_mine_send_tries(minion_opts):