mirror of https://github.com/sgoudham/Enso-Bot.git
48 lines
1.6 KiB
Python
48 lines
1.6 KiB
Python
5 years ago
|
import asyncio
|
||
|
import inspect
|
||
|
|
||
|
|
||
|
def is_double_callable(application):
|
||
|
"""
|
||
|
Tests to see if an application is a legacy-style (double-callable) application.
|
||
|
"""
|
||
|
# Look for a hint on the object first
|
||
|
if getattr(application, "_asgi_single_callable", False):
|
||
|
return False
|
||
|
if getattr(application, "_asgi_double_callable", False):
|
||
|
return True
|
||
|
# Uninstanted classes are double-callable
|
||
|
if inspect.isclass(application):
|
||
|
return True
|
||
|
# Instanted classes depend on their __call__
|
||
|
if hasattr(application, "__call__"):
|
||
|
# We only check to see if its __call__ is a coroutine function -
|
||
|
# if it's not, it still might be a coroutine function itself.
|
||
|
if asyncio.iscoroutinefunction(application.__call__):
|
||
|
return False
|
||
|
# Non-classes we just check directly
|
||
|
return not asyncio.iscoroutinefunction(application)
|
||
|
|
||
|
|
||
|
def double_to_single_callable(application):
|
||
|
"""
|
||
|
Transforms a double-callable ASGI application into a single-callable one.
|
||
|
"""
|
||
|
|
||
|
async def new_application(scope, receive, send):
|
||
|
instance = application(scope)
|
||
|
return await instance(receive, send)
|
||
|
|
||
|
return new_application
|
||
|
|
||
|
|
||
|
def guarantee_single_callable(application):
|
||
|
"""
|
||
|
Takes either a single- or double-callable application and always returns it
|
||
|
in single-callable style. Use this to add backwards compatibility for ASGI
|
||
|
2.0 applications to your server/test harness/etc.
|
||
|
"""
|
||
|
if is_double_callable(application):
|
||
|
application = double_to_single_callable(application)
|
||
|
return application
|