Source code for toui._signals
"""
A module that creates instructions "signals" to allow communicating with JavaScript.
"""
import inspect
import json
from toui._helpers import debug, info
from copy import copy
from functools import wraps
class Signal:
included_private_methods = ["_open_another_page"]
no_return_functions = []
def __init__(self, return_type=None, app_types=['Website', 'DesktopApp']):
self.ws = None
self.return_type = return_type
self.app_types = app_types
def __call__(decorator, func):
decorator._func = func
@wraps(func)
def new_func(self, *args, **kwargs):
decorator.object = self
if self._signal_mode:
original_copy = copy(self)
value = decorator._func(self, *args, **kwargs)
real_output = value
if self._signal_mode and self._app.__class__.__name__ in decorator.app_types:
if self.__class__.__name__ == 'Element':
decorator.ws = self._parent_page.__dict__.get("_ws")
elif self.__class__.__name__ == "Page":
decorator.ws = self.__dict__.get("_ws")
kwargs = inspect.signature(decorator._func).bind(self, *args, **kwargs)
kwargs.apply_defaults()
kwargs = kwargs.arguments
kwargs['return_value'] = value
kwargs['object'] = kwargs['self']
kwargs['original_copy'] = original_copy
del kwargs['self']
real_output = decorator._call_method(decorator._func, **kwargs)
if func.__name__ in decorator.no_return_functions:
return
if decorator.return_type == "js":
return real_output
return value
return new_func
def _call_method(self, func_, **kwargs):
for method_name, method in inspect.getmembers(self, inspect.isfunction):
if method_name == func_.__name__ and (not method_name.startswith("_") or
method_name in self.included_private_methods):
signal = method(**kwargs)
if signal:
return self._send(signal)
else:
return
def _send(self, signal):
msg_num = self.ws.msg_num = self.ws.msg_num + 1
signal['kwargs']['msg-num'] = msg_num
self.ws.send(json.dumps(signal))
debug(f"SENT: {signal}")
if self.return_type == "js":
valid_message = False
while not valid_message:
data_from_js = self.ws.receive()
debug(f"DATA RECEIVED")
data_validation = self.object._app._validate_data(data_from_js)
if not data_validation:
info("Data validation returns `False`. The data will not be used.")
return
data_dict = json.loads(data_from_js)
if data_dict.get("msg-num") == msg_num:
valid_message = True
else:
if data_dict.get("type") == "page":
debug("Adding to pending pages")
self.ws.pending_pages.append(data_dict)
else:
self.ws.pending_messages[data_dict.get("msg-num")] = data_dict
debug(f"Non-matching message number: {data_dict.get('msg-num')}, checking for other messages..")
if msg_num in self.ws.pending_messages:
data_dict = self.ws.pending_messages.pop(msg_num)
valid_message = True
debug("Could not find message number")
debug(f"Message number: {msg_num} found")
debug(f"RECEIVED DATA KEYS: {list(data_dict.keys())}")
if data_dict['type'] == "files":
files = []
for file_dict in data_dict['data']:
files.append(File(file_dict, signal, self.object._app, file_dict['file-id'], ws=self.ws))
return files
return data_dict['data']
[docs]class File:
"""
Contains the information of an uploaded file and can be used to save the file contents.
This object should only be created through `Element.get_files()` method.
Attributes
----------
name
Name of the file.
type
Type of the file.
size
Size of the file.
content: str or None
Content of the file. If the parameter `with_contents` in the method `Element.get_files` was set as ``True``,
you will be able to access the content using this attribute. Otherwise, the value of this attribute will be
``None``.
last_modified
The last modified date as the number of milliseconds since the Unix epoch (January 1, 1970 at midnight).
is_binary: bool
Set the value of this attribute to ``True`` only if you want the file content to be converted to bytes
before saving it.
See Also
--------
Element.get_files()
"""
def __init__(self, file_dict, signal, app, file_id, ws=None):
self._file_dict = file_dict
self.name = self._file_dict['name']
self.size = self._file_dict['size']
self.type = self._file_dict['file-type']
self.content = None
if "content" in self._file_dict:
self.content = self._file_dict['content']
self.last_modified = self._file_dict['last-modified']
self._ws = ws
self._id = file_id
self._app = app
self._signal = signal
self.is_binary = False
def __iter__(self):
js_func = "_saveFile"
js_args = []
js_kwargs = {}
js_kwargs['file-id'] = self._id
js_kwargs['binary'] = self.is_binary
signal = {'func': js_func, 'args': js_args, 'kwargs': js_kwargs}
msg_num = self._ws.msg_num = self._ws.msg_num + 1
signal['kwargs']['msg-num'] = msg_num
self._ws.send(json.dumps(signal))
debug(f"SENT: {signal}")
while True:
data_dict = self._get_valid_message(msg_num)
if data_dict is None:
break
data = data_dict['data']
if self.is_binary:
data = bytearray(data)
yield data
if data_dict['end'] == True:
break
def __repr__(self):
return f"<File {self.name}>"
def _get_valid_message(self, msg_num):
valid_message = False
data_dict = None
while not valid_message:
data_from_js = self._ws.receive()
debug(f"DATA RECEIVED")
data_validation = self._app._validate_data(data_from_js)
if not data_validation:
info("Data validation returns `False`. The data will not be used.")
return
data_dict = json.loads(data_from_js)
if data_dict.get("msg-num") == msg_num:
valid_message = True
else:
if data_dict.get("type") == "page":
debug("Adding to pending pages")
self._ws.pending_pages.append(data_dict)
else:
self._ws.pending_messages[data_dict.get("msg-num")] = data_dict
debug(f"Non-matching message number: {data_dict.get('msg-num')}, checking for other messages..")
if msg_num in self._ws.pending_messages:
data_dict = self._ws.pending_messages.pop(msg_num)
valid_message = True
debug(f"Message number: {msg_num} found")
return data_dict
[docs] def save(self, stream):
"""
Saves the contents of the file to a stream.
Parameters
----------
stream
Examples
--------
Saving a `File` object to a stream.
>>> def saveFile():
... pg = app.get_user_page()
... input_element = pg.get_element("file-element") # Assuming 'file-content' is an id of an element that uploads files
... files = input_element.get_files()
... for file in files:
... with open(file.name, "w") as stream:
... file.save(stream)
"""
for data in self:
stream.write(data)
stream.flush()