Linux premium155.web-hosting.com 4.18.0-513.11.1.lve.el8.x86_64 #1 SMP Thu Jan 18 16:21:02 UTC 2024 x86_64
LiteSpeed
: 162.0.235.200 | : 13.59.183.186
Cant Read [ /etc/named.conf ]
7.4.33
varifktc
www.github.com/MadExploits
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
CPANEL RESET
CREATE WP USER
README
+ Create Folder
+ Create File
/
opt /
alt /
python38 /
lib64 /
python3.8 /
unittest /
[ HOME SHELL ]
Name
Size
Permission
Action
__pycache__
[ DIR ]
drwxr-xr-x
__init__.py
3.19
KB
-rw-r--r--
__main__.py
472
B
-rw-r--r--
async_case.py
5.67
KB
-rw-r--r--
case.py
58.19
KB
-rw-r--r--
loader.py
22.17
KB
-rw-r--r--
main.py
10.97
KB
-rw-r--r--
mock.py
96.01
KB
-rw-r--r--
result.py
7.27
KB
-rw-r--r--
runner.py
7.58
KB
-rw-r--r--
signals.py
2.35
KB
-rw-r--r--
suite.py
12.51
KB
-rw-r--r--
util.py
5.09
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : async_case.py
import asyncio import inspect from .case import TestCase class IsolatedAsyncioTestCase(TestCase): # Names intentionally have a long prefix # to reduce a chance of clashing with user-defined attributes # from inherited test case # # The class doesn't call loop.run_until_complete(self.setUp()) and family # but uses a different approach: # 1. create a long-running task that reads self.setUp() # awaitable from queue along with a future # 2. await the awaitable object passing in and set the result # into the future object # 3. Outer code puts the awaitable and the future object into a queue # with waiting for the future # The trick is necessary because every run_until_complete() call # creates a new task with embedded ContextVar context. # To share contextvars between setUp(), test and tearDown() we need to execute # them inside the same task. # Note: the test case modifies event loop policy if the policy was not instantiated # yet. # asyncio.get_event_loop_policy() creates a default policy on demand but never # returns None # I believe this is not an issue in user level tests but python itself for testing # should reset a policy in every test module # by calling asyncio.set_event_loop_policy(None) in tearDownModule() def __init__(self, methodName='runTest'): super().__init__(methodName) self._asyncioTestLoop = None self._asyncioCallsQueue = None async def asyncSetUp(self): pass async def asyncTearDown(self): pass def addAsyncCleanup(self, func, /, *args, **kwargs): # A trivial trampoline to addCleanup() # the function exists because it has a different semantics # and signature: # addCleanup() accepts regular functions # but addAsyncCleanup() accepts coroutines # # We intentionally don't add inspect.iscoroutinefunction() check # for func argument because there is no way # to check for async function reliably: # 1. It can be "async def func()" iself # 2. Class can implement "async def __call__()" method # 3. Regular "def func()" that returns awaitable object self.addCleanup(*(func, *args), **kwargs) def _callSetUp(self): self.setUp() self._callAsync(self.asyncSetUp) def _callTestMethod(self, method): self._callMaybeAsync(method) def _callTearDown(self): self._callAsync(self.asyncTearDown) self.tearDown() def _callCleanup(self, function, *args, **kwargs): self._callMaybeAsync(function, *args, **kwargs) def _callAsync(self, func, /, *args, **kwargs): assert self._asyncioTestLoop is not None ret = func(*args, **kwargs) assert inspect.isawaitable(ret) fut = self._asyncioTestLoop.create_future() self._asyncioCallsQueue.put_nowait((fut, ret)) return self._asyncioTestLoop.run_until_complete(fut) def _callMaybeAsync(self, func, /, *args, **kwargs): assert self._asyncioTestLoop is not None ret = func(*args, **kwargs) if inspect.isawaitable(ret): fut = self._asyncioTestLoop.create_future() self._asyncioCallsQueue.put_nowait((fut, ret)) return self._asyncioTestLoop.run_until_complete(fut) else: return ret async def _asyncioLoopRunner(self, fut): self._asyncioCallsQueue = queue = asyncio.Queue() fut.set_result(None) while True: query = await queue.get() queue.task_done() if query is None: return fut, awaitable = query try: ret = await awaitable if not fut.cancelled(): fut.set_result(ret) except (SystemExit, KeyboardInterrupt): raise except (BaseException, asyncio.CancelledError) as ex: if not fut.cancelled(): fut.set_exception(ex) def _setupAsyncioLoop(self): assert self._asyncioTestLoop is None loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.set_debug(True) self._asyncioTestLoop = loop fut = loop.create_future() self._asyncioCallsTask = loop.create_task(self._asyncioLoopRunner(fut)) loop.run_until_complete(fut) def _tearDownAsyncioLoop(self): assert self._asyncioTestLoop is not None loop = self._asyncioTestLoop self._asyncioTestLoop = None self._asyncioCallsQueue.put_nowait(None) loop.run_until_complete(self._asyncioCallsQueue.join()) try: # cancel all tasks to_cancel = asyncio.all_tasks(loop) if not to_cancel: return for task in to_cancel: task.cancel() loop.run_until_complete( asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)) for task in to_cancel: if task.cancelled(): continue if task.exception() is not None: loop.call_exception_handler({ 'message': 'unhandled exception during test shutdown', 'exception': task.exception(), 'task': task, }) # shutdown asyncgens loop.run_until_complete(loop.shutdown_asyncgens()) finally: asyncio.set_event_loop(None) loop.close() def run(self, result=None): self._setupAsyncioLoop() try: return super().run(result) finally: self._tearDownAsyncioLoop()
Close