"""
A module that creates web apps and desktop apps.
"""
import __main__
import threading
import json
import uuid
import time
import os
import requests
from urllib.parse import urlparse
from urllib.parse import parse_qs
from copy import copy
from abc import ABCMeta, abstractmethod
from collections import UserDict
from collections.abc import MutableMapping
from functools import wraps
from typing import Any, Union
from flask import Flask, session, request, send_file, make_response, redirect
from flask_sock import Sock
import webview
from toui._helpers import warn, info, debug, error
from toui.pages import Page
from toui.exceptions import ToUIWrongPlaceException, ToUINotAddedError, ToUIOverlapException
from toui._defaults import validate_ws, validate_data
_imported_optional_reqs = {'flask-login':False,
'flask-sqlalchemy':False,
'flask-basicauth':False,
'firebase_admin': False,
'stripe': False}
try:
from flask_login import LoginManager, UserMixin, current_user, login_user, logout_user, AnonymousUserMixin
_imported_optional_reqs['flask-login'] = True
except ModuleNotFoundError: pass
try:
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import OperationalError
_imported_optional_reqs['flask-sqlalchemy'] = True
except ModuleNotFoundError: pass
try:
from flask_basicauth import BasicAuth
_imported_optional_reqs['flask-basicauth'] = True
except ModuleNotFoundError: pass
try:
import firebase_admin
import firebase_admin.db
import firebase_admin.firestore
import firebase_admin.credentials
import firebase_admin.storage
import firebase_admin.auth
from google.cloud.firestore_v1.base_query import FieldFilter, And
_imported_optional_reqs['firebase_admin'] = True
except ModuleNotFoundError: pass
try:
import stripe
_imported_optional_reqs['stripe'] = True
except ModuleNotFoundError: pass
class _ReqsChecker:
def __init__(self, reqs) -> None:
self.reqs = reqs
def __call__(self, func) -> Any:
@wraps(func)
def new_func(*args, **kwargs):
for req in self.reqs:
if _imported_optional_reqs[req]:
return func(*args, **kwargs)
else:
raise ModuleNotFoundError(f"You have not installed the optional package `{req}` yet, to install it run:\n\tpip install {req}")
return new_func
class _App(metaclass=ABCMeta):
"""The base class for DesktopApp and Website"""
def __init__(self, name=None, assets_folder=None, secret_key=None, vars_timeout=86400, gen_sid_algo=None):
"""
Parameters
----------
name: str (optional)
The name of the app.
assets_folder: str (optional)
The path to the folder that contains the HTML files and other assets.
secret_key: str (optional)
Sets the `secret_key` attribute for `flask.Flask`. You can also set the environment variable SECRET_KEY from
the command line and ToUI will get it using `os.environ`.
vars_timeout: int (optional)
The timeout interval before the temporary user-specific variables are deleted from `user_vars` attribute.
The default is 86400 seconds (1 day).
gen_sid_algo: Callable (optional)
A callable that generates a unique user id so that ToUI can store data for each user/browser. If ``None``, the
IP address, user agent, and secret key will be used.
Attributes
----------
flask_app: Flask
ToUI creates applications using `Flask`. You can access the `Flask` object using the attribute `flask_app`.
forbidden_urls: list
These are URLs that ToUI does not allow the user to use because ToUI uses them.
user_vars: dict
A dictionary that stores temporary data unique to each user.
page_vars: dict
A dictionary that stores data that is deleted when the user leaves the page.
pages: list
A list of added `Page` objects.
firestore: firebase_admin.firestore.Firestore
If firebase was added using `add_firebase`, you can access firestore database using this attribute.
Otherwise, it will be ``None``. See `Firebase documentation <https://firebase.google.com/docs/firestore>`_ and
`Firestore API Reference <https://cloud.google.com/python/docs/reference/firestore/latest/admin_client>`_.
.. admonition:: Behind The Scenes
:class: tip
ToUI uses `Flask` and its extenstions to create apps. When creating an instance of this class, the following
extensions are used:
- `Sock` class extension from `Flask-Sock` package.
"""
self._functions = {}
if not name:
if hasattr(__main__, "__file__"):
name = os.path.basename(__main__.__file__).split(".")[0]
else:
name = "app"
if not assets_folder:
assets_folder = "/"
self.flask_app = Flask(name, static_folder=assets_folder, static_url_path="/")
if secret_key is not None:
self.flask_app.secret_key = secret_key
elif os.environ.get('SECRET_KEY') is not None:
self.flask_app.secret_key = os.environ.get('SECRET_KEY')
else:
warn("No secret key was set. Generating a random secret key for Flask.")
self.flask_app.secret_key = os.urandom(50)
self.pages = []
self._add_communication_method()
self._add_user_vars(timeout_interval=vars_timeout, gen_sid_algo=gen_sid_algo)
self.flask_app.route("/toui-download-<path_id>", methods=['POST', 'GET'])(self._download)
self.flask_app.route("/toui-google-sign-in", methods=['POST', 'GET'])(self._sign_in_using_google)
self.forbidden_urls = ['/toui-communicate', "/toui-download-<path_id>", "/toui-google-sign-in"]
self.firestore = None
self._validate_ws = validate_ws
self._validate_data = validate_data
self._auth = None
self._user_cls = None
self._user_db_type = None
self._firebase_app = None
self._firebase_users_db = None
self._google_data = {}
@abstractmethod
def run(self): pass
def add_pages(self, *pages, do_copy=False, blueprint=None, endpoint=None):
"""
Adds pages to the app.
Parameters
----------
pages: list(Page)
List of `Page` objects.
do_copy: bool, default = False
If ``True``, the `Page` will be copied before adding to the app.
blueprint: toui.structure.ToUIBlueprint, flask.Blueprint, default = None
If a `flask.Blueprint` or a `ToUIBlueprint` was added, the `Page` view_func will be added
to the blueprint instead of the `Flask` app.
endpoint
`endpoint` parameter in `flask.Flask.route`. If ``None``, the endpoint will be set
as the unique id of the `Page`. The unique id is obtained through the ``id()``
function and converted to a string.
"""
for page in pages:
if page.url in self.forbidden_urls:
warn(f"The URL '{page.url}' is not allowed and might cause errors.")
if do_copy:
page = copy(page)
page._app = self
page._add_script()
self.pages.append(page)
view_func = page._view_func
if self._auth:
view_func = self._auth.required(view_func)
if not endpoint:
endpoint_ = str(id(page))
else:
endpoint_ = endpoint
if blueprint:
route = blueprint.route(page.url, methods=['GET', 'POST'], endpoint=endpoint_)(view_func)
else:
route = self.flask_app.route(page.url, methods=['GET', 'POST'], endpoint=endpoint_)(view_func)
def open_new_page(self, url, new=False, different_origin=False):
"""
Opens another URL.
This function should only be called after the app starts running.
Parameters
----------
url: str
URL of the new page.
new: bool, default=False
If ``True``, the URL will be opened in a new tab/window.
different_origin: bool, default=False
Set it as ``True`` only if the URL includes a different origin.
Returns
-------
None
"""
try:
session.keys()
self.get_user_page()._open_another_page(url, new=new, different_origin=different_origin)
except RuntimeError:
raise ToUIWrongPlaceException(f"The function `open_new_page` should only be called after the app runs.")
@staticmethod
def get_user_page() -> Page:
"""
A static method that returns the current `Page`.
This function should only be called after the app starts running.
Returns
-------
pg: Page
"""
try:
return session['user page']
except RuntimeError as e:
raise ToUIWrongPlaceException(f"The function `get_user_page` should only be called after the app runs.")
def get_current_url(self):
"""
A method that returns the current full URL.
This function should only be called after the app starts running.
Returns
-------
str
"""
try:
return session['toui-request-url']
except RuntimeError as e:
raise ToUIWrongPlaceException(f"The function `get_current_url` should only be called after the app runs.")
def get_query_params(self, url=None):
"""
A method that returns the query parameters of the current URL.
Parameters
----------
url: str, default=None
If specified, the method will return the parameters of this argument. Otherwise, the method will return the
query parameters of the current URL.
Returns
-------
dict
Each key in the dictionary is the name of the parameter, and each value is a list.
"""
try:
if url is None:
url = self.get_current_url()
params = parse_qs(urlparse(url).query)
return params
except RuntimeError as e:
raise ToUIWrongPlaceException(f"The function `get_query_params` with no arguments should only be called after the app runs.")
@property
def user_vars(self):
"""Gets user-specific variables."""
return self._user_vars
@property
def page_vars(self):
"""Gets and sets variables that are deleted when user leaves the page."""
return session['toui-page-vars']
def download(self, filepath, new=True):
"""
Downloads a file from the server to a client.
Parameters
----------
filepath: str
The path to the file (on the server).
new: bool, default=True
Opens new tab/window when downloading file.
"""
path_id = 0
while self._user_vars._get(f'toui-download-{path_id}'):
path_id += 1
self._user_vars._set(f'toui-download-{path_id}', filepath)
self.open_new_page(f"/toui-download-{path_id}", new=new)
@_ReqsChecker(['firebase_admin'])
def add_firebase(self, firebase_config: Union[dict, str], **options):
"""
Adds Firebase to the app.
Parameters
----------
firebase_config: dict, str
The Firebase configuration dictionary or the path of credentials JSON.
options (optional)
Extra options for initializing Firebase. See `Google's documentation <https://firebase.google.com/docs/reference/admin/python/firebase_admin#initialize_app>`_.
"""
certificate = firebase_admin.credentials.Certificate(firebase_config)
self._firebase_app = firebase_admin.initialize_app(certificate, options)
self.firestore = firebase_admin.firestore.client()
def store_file_using_firebase(self, destination_path, file_path, bucket_name=None):
"""
Uploads a file to Firebase storage.
Parameters
----------
destination_path: str
The path of the file in Firebase storage.
file_path: str
The path of the file on the server.
bucket_name: str, default = None
The name of the bucket. If ``None``, the default bucket will be used. However, if you did not specify "storageBucket"
option in the Firebase configuration, you must specify the bucket name in this function.
Returns
-------
None
"""
bucket = firebase_admin.storage.bucket(name=bucket_name)
blob = bucket.blob(destination_path)
blob.upload_from_filename(file_path)
def get_file_from_firebase(self, source_path, new_file_path, bucket_name=None):
"""
Downloads a file from Firebase storage.
Parameters
----------
source_path: str
The path of the file in Firebase storage.
new_file_path: str
The path of the file on the server.
bucket_name: str, default = None
The name of the bucket. If ``None``, the default bucket will be used. However, if you did not specify "storageBucket"
option in the Firebase configuration, you must specify the bucket name in this function.
Returns
-------
None
"""
bucket = firebase_admin.storage.bucket(name=bucket_name)
blob = bucket.blob(source_path)
blob.download_to_filename(new_file_path)
@_ReqsChecker(['flask-sqlalchemy', 'flask-login'])
def add_user_database_using_sql(self, database_uri, other_columns=[], user_cls=None, table_name="users"):
"""
Creates a simple database that has data specific to each user.
The database has a table that contains the following columns: `username`, `password`, and `id`. To add other columns,
add their names in `other_columns` list. Note that this is different from `user_vars` which is a stores temporary
data without the need to sign in.
Warning
-------
A table called `users` (or whatever you input as table_name) will be created in the database.
If you already have a table with the same name, it might be overwritten.
Parameters
----------
database_uri: str
The URI of the database that you want to connect to.
other_columns: list
The names of table columns other than `username`, `password`, `email` and `id`.
user_cls: Callable, default=None
If this parameter is ``None``, a table called `users` will be created. However, if this parameter was set, the
table `users` will not be created and the parameter `user_cls` will be used instead.
table_name: str, default="users"
The name of the table that will be created/modified in the database.
.. admonition:: Behind The Scenes
:class: tip
The following flask extensions are used when calling this function:
- `SQLAlchemy` class extension from `Flask-SQLAlchemy` package.
- `LoginManager` class extension from `Flask-Login` package.
The following `Flask` configurations are also set:
- `SQLALCHEMY_DATABASE_URI = database_uri`
"""
if table_name == "users":
UserWarning("Starting from version 4.0.0, the table_name parameter will be required.")
if self._user_db_type == "firebase":
raise ToUIOverlapException("This function cannot be called when using Firebase user database.")
self._user_db_type = "sql"
self.flask_app.config['SQLALCHEMY_DATABASE_URI'] = database_uri
self._db = SQLAlchemy(self.flask_app)
self._login_manager = LoginManager(self.flask_app)
self._load_user = self._login_manager.user_loader(self._load_user)
if not user_cls:
class User(UserMixin, self._db.Model):
__tablename__ = table_name
id = self._db.Column(self._db.Integer, primary_key=True)
username = self._db.Column(self._db.String, nullable=False, unique=True)
email = self._db.Column(self._db.String, nullable=True, unique=True)
password = self._db.Column(self._db.String, nullable=True, unique=False)
def __repr__(self):
return f'<User {self.username}>'
for col in other_columns:
setattr(User, col, self._db.Column(self._db.String))
else:
User = user_cls
self._user_cls = User
with self.flask_app.app_context():
try:
self._db.create_all()
except OperationalError as err:
error(f"OperationalError while creating the database: {err}")
@_ReqsChecker(['firebase_admin'])
def add_user_database_using_firebase(self, collection_name="users"):
"""
Adds Firebase user database to the app.
Make sure you create a firestore database (not realtime) in your Firebase app.
Warning
-------
A collection called `users` (or whatever you input as collection_name) will be created in the database.
If you already have a collection with the same name, it might be overwritten.
Parameters
----------
collection_name: str, default="users"
The name of the collection that will be created/modified in the database.
.. admonition:: Behind The Scenes
:class: tip
Firebase authentication and database are used when calling this function.
"""
if collection_name == "users":
UserWarning("Starting from version 4.0.0, the collection_name parameter will be required.")
if self._firebase_app is None:
raise ToUINotAddedError("Firebase is not added to the app. Use `add_firebase` to add it.")
if self._user_db_type == "sql":
raise ToUIOverlapException("This function cannot be called when using SQL user database.")
self._user_db_type = "firebase"
self._firebase_users_db = firebase_admin.firestore.client().collection("users")
# Website-specific methods
@staticmethod
def get_request():
"""
A static method that gets data sent from client using HTTP request.
This method returns the `request` object of `Flask`. The `request` object has some useful attributes such as
`request.files` which retrieves uploaded files.
Examples
--------
To use this method, first create the app and a page:
>>> app = Website(__name__, secret_key="some key")
>>> home_page = Page(html_str=\"\"\"<form method="post" enctype="multipart/form-data">
... <input type="file" name="filename">
... <input type="submit">
... </form>\"\"\", url="/")
Then create a function that will be called when an HTTP request is made:
>>> def request_function():
... request = app.get_request()
... print(request.files)
Now add the function to `Page.on_url_request()` method:
>>> home_page.on_url_request(request_function)
Add the page to the app and run the app:
>>> app.add_pages(home_page)
>>> if __name__ == "__main__":
... app.run() # doctest: +SKIP
Returns
-------
flask.request
See Also
--------
flask.request
https://flask.palletsprojects.com/en/2.2.x/api/#flask.request
Page.on_url_request
"""
return request
def redirect_response(self, url, code=302, Response=None):
"""
Use it with `Page.on_url_request` to redirect the user to another page.
`Page.on_url_request` is called when the user makes an HTTP request to the page. If you want to redirect the user
to another page, define a function then use this method as the return value of a function. Then add the function to
`Page.on_url_request` and set `display_return_value` as ``True``.
Examples
--------
>>> def redirect_function():
... return app.redirect_response("/another-page-url")
>>> home_page.on_url_request(redirect_function, display_return_value=True)
Parameters
----------
url: str
The URL of the page that the user will be redirected to.
code: int, default=302
response: flask.Response, default=None
"""
return redirect(url, code=code, Response=Response)
def signup_user(self, username, password=None, email=None, **other_info):
"""
Creates a new user in the database.
Parameters
----------
username: str
password: str
email: str
other_info
Other information required for signing up.
Returns
-------
bool
``True`` if the user is created, and ``False`` if it is not created.
"""
self._confirm_user_database_created()
if self.username_exists(username) or self.email_exists(email):
return False
if self._user_db_type == "sql":
new_user = self._user_cls(username=username, password=password, email=email, **other_info)
self._db.session.add(new_user)
self._db.session.commit()
elif self._user_db_type == "firebase":
if password is not None:
if len(password) < 6:
error("Password for Firebase authentication needs to be at least 6 characters")
return False
user_record = firebase_admin.auth.create_user(password=password, display_name=username, email=email)
self._firebase_users_db.document(user_record.uid).set({"username": username, 'password': password,
'email': email, **other_info})
if self.username_exists(username) or self.email_exists(email):
return True
else:
return False
def signin_user(self, username, password=None, email=None, **other_info):
"""
Loads the data of a user from database.
Parameters
----------
username: str
password: str
other_info
Other information required for signing in.
Returns
-------
bool
``True`` if the user is signed in, and ``False`` if it is not signed in.
"""
self._confirm_user_database_created()
if not self.username_exists(username) and not self.email_exists(email):
return False
if self._user_db_type == "sql":
filter = {"username": username, **other_info}
if email is not None:
filter["email"] = email
if password is not None:
filter["password"] = password
user = self._user_cls.query.filter_by(**filter).first()
if user:
login_user(user)
self._user_vars._set("user-id", user.id)
return True
else:
return False
elif self._user_db_type == "firebase":
users = self._firebase_users_db.where("username", "==", username).get()
if email is not None:
users = [user for user in users if user.get("email") == email]
if len(users) == 0:
return False
if password is not None:
users = [user for user in users if user.get("password") == password]
if len(users) == 0:
return False
return self.signin_user_from_id(users[0].id, **other_info)
def get_users_ids_from_data(self, **data):
"""
Gets the user ids from the users data.
This can be used to sign-in the users later using `signin_user_from_id`.
Warning
-------
This function will return a list of user ids. If the aim is to get a single user id, make sure that the length
of the list is 1.
Parameters
----------
data
The data (keyword arguments) that will be used to get the user ids.
Returns
-------
list
A list of user ids.
"""
self._confirm_user_database_created()
if self._user_db_type == "sql":
filter = {**data}
users = self._user_cls.query.filter_by(**filter).all()
return [user.id for user in users]
elif self._user_db_type == "firebase":
filter = And(filters=[FieldFilter(key, "==", value) for key, value in data.items()])
users = self._firebase_users_db.where(filter=filter).get()
return [user.id for user in users]
def signin_user_from_id(self, user_id, **other_info):
"""
Loads the data of a user from database using the user's ID.
Parameters
----------
id: int
other_info
Other information required for signing in.
Returns
-------
bool
``True`` if the user is signed in, and ``False`` if it is not signed in.
"""
self._confirm_user_database_created()
if self._user_db_type == "sql":
user = self._user_cls.query.filter_by(id=user_id, **other_info).first()
if user:
login_user(user)
self._user_vars._set("user-id", user_id)
return True
else:
return False
elif self._user_db_type == "firebase":
user = firebase_admin.auth.get_user(user_id)
if user:
user_dict = self._firebase_users_db.document(user.uid).get().to_dict()
for key, value in other_info.items():
if user_dict.get(key) != value:
return False
self._user_vars._set("user-id", user_id)
return True
else:
return False
def get_current_user_data(self, key):
"""
Gets data specific to the currently signed in user from the database.
Parameters
----------
key: str
The key (name) of the data. For example: "username", "email", "age", etc.
Returns
-------
Any
The value of the data.
"""
self._confirm_user_database_created()
if not self.is_signed_in():
error("No user is signed in.")
return None
if self._user_db_type == "sql":
if not key in current_user.__table__.columns:
error(f"'{key}' was not added as a column in users table")
return None
return getattr(current_user, key)
elif self._user_db_type == "firebase":
return self._firebase_users_db.document(self._user_vars._get("user-id")).get().to_dict().get(key)
def set_current_user_data(self, key, value):
"""
Sets data specific to the currently signed in user in the database.
Warning
-------
Currently, if you are using SQL database, you can only set data that
was already added as a column in the users table.
Parameters
----------
key: str
The key (name) of the data. For example: "username", "email", "age", etc.
value: Any
The value of the data.
Returns
-------
bool
``True`` if the data is set, and ``False`` if it is not set.
"""
self._confirm_user_database_created()
if not self.is_signed_in():
error("No user is signed in.")
return False
if self._user_db_type == "sql":
if not key in current_user.__table__.columns:
error(f"'{key}' was not added as a column in users table")
return False
setattr(current_user, key, value)
self._db.session.commit()
return True
elif self._user_db_type == "firebase":
self._firebase_users_db.document(self._user_vars._get("user-id")).update({key: value})
return True
def get_current_user_id(self):
"""
Gets the ID of the currently signed in user.
Returns
-------
str
The ID of the user.
"""
self._confirm_user_database_created()
if not self.is_signed_in():
error("No user is signed in.")
return None
return self._user_vars._get("user-id")
def signout_user(self):
"""
A method that signs out the current user.
"""
self._confirm_user_database_created()
if self._user_db_type == "sql":
logout_user()
self._user_vars._del('user-id')
def username_exists(self, username):
"""
Checks if the username is exists in the database.
Parameters
----------
username: str
Returns
-------
bool
``True`` if the username exists, otherwise ``False``.
"""
self._confirm_user_database_created()
if self._user_db_type == "sql":
if self._user_cls.query.filter_by(username=username).first():
info(f"User {username} exists")
return True
else:
return False
elif self._user_db_type == "firebase":
if len(self._firebase_users_db.where("username", "==", username).get()) > 0:
info(f"User {username} exists")
return True
else:
return False
def email_exists(self, email):
"""
Checks if the email is exists in the database.
Parameters
----------
email: str
Returns
-------
bool
``True`` if the email exists, otherwise ``False``.
"""
self._confirm_user_database_created()
if email is None:
return False
if self._user_db_type == "sql":
if self._user_cls.query.filter_by(email=email).first():
info(f"Email {email} exists")
return True
else:
return False
elif self._user_db_type == "firebase":
if len(self._firebase_users_db.where("email", "==", email).get()) > 0:
return True
else:
return False
def is_signed_in(self):
"""
Checks if the user is signed in.
Returns
-------
bool
"""
self._confirm_user_database_created()
if self.user_vars._get('user-id'):
if self._user_db_type == "sql":
login_user(self._user_cls.query.filter_by(id=self._user_vars._get("user-id")).first())
return True
else:
return False
def sign_in_using_google(self, client_id, client_secret, after_auth_url, additional_scopes=None, custom_username=None, custom_host=None,
**other_params):
"""
Signs in a user using Google (Experimental).
Make sure to create a Google app first. Also, add the following as an authorized redirect URI to your Google app:
``https://<your-domain>/toui-google-sign-in``
Parameters
----------
client_id: str
The client ID of the Google app.
client_secret: str
The client secret of the Google app.
after_auth_url: str
The URL to redirect to after completing authentication. This is not the same as the redirect uri of the Google app, so
you do not need to register it as an authorized redirect URI in your Google app.
additional_scopes: list, default=None (optional)
By default, the user allows the app to only access non-sensitive information such as the user's name and email. If you
want to access more information, you can pass a list of scopes. For more information, see `Google's documentation <https://developers.google.com/identity/protocols/oauth2/scopes>`_.
custom_username: str, default=None (optional)
If you want to use a custom username instead of the user's email, you can pass it here.
custom_host: str, default=None (optional)
Only use this option if you need to change the scheme and host of the redirect uri. For example,
if you want to use ``http://127.0.0.1:5000`` instead of ``http://localhost:5000``, you can pass
``http://127.0.0.1:5000`` here.
other_params: kwargs (optional)
Keyword arguments that can be passed as parameters to authorization url.For more information, see
`Google's documentation <https://developers.google.com/identity/protocols/oauth2/web-server#httprest_1>`_.
"""
self._google_data = {"client_id": client_id, "client_secret": client_secret}
scope = "https://www.googleapis.com/auth/userinfo.email https://www.googleapis.com/auth/userinfo.profile"
if additional_scopes:
for s in additional_scopes:
scope += f" {s}"
url = f"/toui-google-sign-in?scope={scope}"
if custom_username:
url += f"&username={custom_username}"
self.user_vars._set('google-redirect-uri-host', custom_host)
for key, value in other_params.items():
url += f"&{key}={value}"
self.user_vars._set('google-after-auth-url', after_auth_url)
self.open_new_page(url=url)
@_ReqsChecker(['stripe'])
def checkout_using_stripe(self, secret_key, **kwargs):
"""
Opens a Stripe checkout page.
Parameters
----------
secret_key: str
The secret key of your Stripe account.
kwargs
Keyword arguments that can be passed to `stripe.checkout.Session.create <https://stripe.com/docs/api/checkout/sessions/create?lang=python>`_.
"""
session = stripe.checkout.Session.create(api_key=secret_key, **kwargs)
self.open_new_page(url=session.url, different_origin=True)
def _checkout_using_paypal(self, api_username, api_password, signature, version, sandbox=False, **kwargs):
"""
NOT USED: Opens a PayPal checkout page.
Parameters
----------
api_username: str
The API username of your PayPal account.
api_password: str
The API password of your PayPal account.
signature: str
The signature of your PayPal account.
version: str, default="204.0"
The version of the PayPal API.
sandbox: bool, default=True
If ``True``, the sandbox version of PayPal will be used. Otherwise, the live version will be used.
kwargs
Keyword arguments that can be passed to `SetExpressCheckout <https://developer.paypal.com/api/nvp-soap/set-express-checkout-nvp/>`_
excluding the following parameters: `USER`, `PWD`, `SIGNATURE`, and `METHOD`.
"""
data = {
'USER': api_username,
'PWD': api_password,
'SIGNATURE': signature,
'METHOD': 'SetExpressCheckout',
**kwargs
}
if sandbox:
nvp_url = 'https://api-3t.sandbox.paypal.com/nvp'
checkout_url = 'https://www.sandbox.paypal.com/cgi-bin/webscr?cmd=_express-checkout&token='
else:
nvp_url = 'https://api-3t.paypal.com/nvp'
checkout_url = 'https://www.paypal.com/cgi-bin/webscr?cmd=_express-checkout&token='
response = requests.post(nvp_url, data=data)
response_dict = dict(parse_qs(response.text))
debug(f"PayPal response: {response_dict}")
if 'Failure' in response_dict['ACK'] or 'FailureWithWarning' in response_dict['ACK']:
error(f"PayPal error, see the following response: {json.dumps(response_dict, indent=4)}")
token = response_dict['TOKEN']
checkout_url += token
self.open_new_page(url=checkout_url, different_origin=True)
def checkout_using_paypal(self, client_id, client_secret, sandbox=True, **kwargs):
"""
Checkout using PayPal API.
1. You need to first get the client ID and the client secret for your PayPal account. See the following
link for more information: `<https://developer.paypal.com/api/rest/>`_.
2. Read the following link to learn about the key word arguments that you need to pass to this function:
`<https://developer.paypal.com/docs/api/orders/v2/#orders_create>`_ (only the parameters
under the `REQUEST BODY` section).
3. Call this function and pass the client ID and the client secret as the first two arguments. Then pass
the keyword arguments that you learned about in step 2.
4. The function will return a dictionary. You might need to store some of the data in this dictionary
for later use. See the `Responses` section in the following link for more information:
`<https://developer.paypal.com/docs/api/orders/v2/#:~:text=payment%20with%20PayPal.-,Responses,-200A%20successful>`_.
Parameters
----------
client_id: str
The client ID of your PayPal account.
client_secret: str
The client secret of your PayPal account.
sandbox: bool, default=True
If ``True``, the sandbox version of PayPal will be used. Otherwise, the live version will be used.
kwargs
Keyword arguments that can be passed to `Create order request <https://developer.paypal.com/docs/api/orders/v2/#orders_create>`_
under the `REQUEST BODY` section.
Returns
-------
dict
The response of the PayPal API. You might need to store some of the data in this dictionary
for later use. See the `Responses` section in the following link for more information:
`Create order request <https://developer.paypal.com/docs/api/orders/v2/#:~:text=payment%20with%20PayPal.-,Responses,-200A%20successful>`_.
"""
if sandbox:
origin = "https://api-m.sandbox.paypal.com"
else:
origin = "https://api-m.paypal.com"
access_token = self._get_paypal_access_token(client_id, client_secret, origin + '/v1/oauth2/token')
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {access_token}',
}
data = kwargs
response = requests.post('https://api-m.sandbox.paypal.com/v2/checkout/orders', headers=headers, data=json.dumps(data))
response_dict = response.json()
if response.status_code != 201:
error(f"PayPal error, see the following response: {json.dumps(response_dict, indent=4)}")
else:
debug(f"PayPal response: {json.dumps(response_dict, indent=4)}")
url = None
for link in response_dict['links']:
if link['rel'] in ['approve', 'payer-action']:
url = link['href']
break
if url is None:
error(f"Could not find PayPal checkout link in the following response: {json.dumps(response_dict, indent=4)}")
else:
self.open_new_page(url=url, different_origin=True)
return response_dict
def _get_paypal_access_token(self, client_id, client_secret, url):
response = requests.post(url,
data={'grant_type': 'client_credentials'},
headers={"Content-Type": "application/x-www-form-urlencoded"},
auth=(client_id, client_secret))
debug(f"PayPal access token response: {json.dumps(response.json(), indent=4)}")
return response.json()['access_token']
@_ReqsChecker(['flask-basicauth'])
def add_restriction(self, username, password):
"""
Makes the app private.
Adds a username and password to the app.
Parameters
----------
username: str
password: str
.. admonition:: Behind The Scenes
:class: tip
When calling this method, the following `Flask` extension is used:
- `BasicAuth` class extension from `Flask-BasicAuth` package.
The following `Flask` configurations are also set:
- `BASIC_AUTH_USERNAME = username`
- `BASIC_AUTH_PASSWORD = password`
"""
self._auth = BasicAuth(self.flask_app)
self.flask_app.config['BASIC_AUTH_USERNAME'] = username
self.flask_app.config['BASIC_AUTH_PASSWORD'] = password
def set_ws_validation(self, func):
"""
Validates a WebSocket connection before sending and accepting data.
ToUI uses Flask-Sock for websocket communication. Flask-Sock generates a
`simple_websocket.ws.Server <https://simple-websocket.readthedocs.io/en/latest/api.html#the-server-class>`_
object when a connection is established. If you wanted to access this object before
sending and receiving data, input a function that has one argument `ws`. This function
should either return ``True`` or ``False``. If the function returns ``False``, no data
will be sent or received using ToUI with the client.
The `ws` argument is a `simple_websocket.ws.Server` object which you can learn about in its
`documentation <https://simple-websocket.readthedocs.io/en/latest/api.html#the-server-class>`_.
You might need to do some testing in order to explore the types of data that you can find in
this object.
Parameters
----------
func: Callable
A function that validates the Server object. It should have one argument `ws` and
should either return ``True`` or ``False``.
See Also
--------
flask_sock
simple_websockets
"""
self._validate_ws = func
def set_data_validation(self, func):
"""
Validates data received from JavaScript before using it.
ToUI receives data from JavaScript in the form of a JSON object. To validate this data
before allowing ToUI to use it, input a function that checks the data. This function
should have one argument `data` and should either return ``True`` or ``False``.
If the function returns ``False``, the data will not be used by ToUI.
You can check the structures of the data received from JavaScript in
https://toui.readthedocs.io/en/latest/how_it_works.html#instructions-sent-and-received.
Note that the structures of the JSON objects might change in future versions of ToUI.
Parameters
----------
func: Callable
A function that validates data received from JavaScript. It should have one argument
`data` and should either return ``True`` or ``False``.
See Also
--------
set_ws_validation
"""
self._validate_data = func
def register_toui_blueprint(self, blueprint, **options):
"""
Registers a `ToUIBlueprint` object. It is similar to `Flask.register_blueprint`.
Parameters
----------
blueprint: toui.structure.ToUIBlueprint
options
Same as `Flask.register_blueprint` `options` parameter.
See Also
--------
toui.structure.ToUIBlueprint
flask.Blueprint
"""
self.add_pages(*blueprint.pages, blueprint=blueprint)
self.flask_app.register_blueprint(blueprint=blueprint, **options)
def _add_communication_method(self):
self.flask_app.config['SOCK_SERVER_OPTIONS'] = {'ping_interval': 25}
self._socket = Sock(self.flask_app)
self._socket.route("/toui-communicate")(self._communicate)
def _add_user_vars(self, timeout_interval, gen_sid_algo):
self._user_vars = _UserVars(self, timeout_interval=timeout_interval, gen_sid_algo=gen_sid_algo)
def _session_check(self):
"""This is a private function."""
try:
session.keys()
except RuntimeError as e:
return False
if not "user page" in session.keys():
session['user page'] = None
if not "_user_id" in session.keys():
user_id = self._user_vars._get('user-id')
if user_id:
session['_user_id'] = user_id
return True
def _download(self, path_id):
file_to_download = self._user_vars._get(f'toui-download-{path_id}')
if file_to_download:
return send_file(file_to_download, as_attachment=True)
def _communicate(self, ws):
"""This is a private function."""
validation = self._validate_ws(ws)
if not validation:
info("WebSocket validation returns `False`. No data should be sent or received.")
return
info(f'WebSocket connected: {ws.connected}')
ws.msg_num = 0
ws.pending_messages = {}
ws.pending_pages = []
while True:
valid_message = False
while not valid_message:
data_from_js = ws.receive()
data_validation = self._validate_data(data_from_js)
if not data_validation:
info("Data validation returns `False`. The data will not be used.")
continue
s = time.time()
data_dict = json.loads(data_from_js)
if data_dict.get("type") == "page":
ws.pending_pages.append(data_dict)
valid_message = True
while True:
if len(ws.pending_pages) == 0:
break
data_dict = ws.pending_pages.pop(0)
self._session_check()
func = data_dict['func']
args = data_dict['args']
url = data_dict['url']
new_html = data_dict['html']
new_page = Page(url=url)
new_page.from_str(new_html)
new_page._app = self
new_page._signal_mode = True
new_page._ws = ws
new_page._inherit_functions()
selector_to_element = data_dict['selector-to-element']
if selector_to_element:
for index, arg in enumerate(args):
if type(arg) is dict:
if arg.get('type') == "element":
args[index] = new_page.get_element_from_selector(arg['selector'])
if "uid" in data_dict:
new_page._uid = data_dict['uid']
session['user page'] = new_page
try:
if new_page._func_exists(func):
new_page._call_func(func, *args)
del session['user page']
except Exception as e:
del session['user page']
raise e
e = time.time()
debug(f"TIME: {e - s}s")
def _confirm_user_database_created(self):
if self._user_db_type is None:
raise ToUINotAddedError("You have not created the user database yet. To create it, call the method: `add_user_database_using_sql` or `add_user_database_using_firebase`.")
def _load_user(self, user_id):
return self._user_cls.query.filter_by(id=int(user_id)).first()
def _get_user_details_from_google_token(self, access_token, refresh_token=None):
headers = {"Authorization": f"Bearer {access_token}"}
r = requests.get("https://www.googleapis.com/oauth2/v1/userinfo", headers=headers)
if r.status_code == 200:
return r.json()
elif r.status_code == 401:
if refresh_token:
r = requests.post("https://oauth2.googleapis.com/token", data={
"client_id": self._google_data['client_id'],
"client_secret": self._google_data['client_secret'],
"refresh_token": refresh_token,
"grant_type": "refresh_token"
},
headers={"Content-Type": "application/x-www-form-urlencoded"})
if r.status_code == 200:
return self._get_user_details_from_google_token(r.json()['access_token'])
raise Exception(f"Error getting user details from Google. Status code: {r.status_code}. Response: {r.text}")
def _sign_in_using_google(self):
client_id = self._google_data['client_id']
client_secret = self._google_data['client_secret']
scope = request.args.get("scope")
if self.user_vars._get('google-redirect-uri-host') is not None:
redirect_uri = self.user_vars._get('google-redirect-uri-host') + "/toui-google-sign-in"
else:
redirect_uri = request.base_url
response_type = "code"
access_type = request.args.get("access_type")
state = request.args.get("state")
include_granted_scopes = request.args.get("include_granted_scopes")
enable_granular_consent = request.args.get("enable_granular_consent")
login_hint = request.args.get("login_hint")
prompt = request.args.get("prompt")
after_auth_url = self.user_vars._get('google-after-auth-url')
username = request.args.get("username")
if "code" in request.args:
code = request.args.get("code")
self.user_vars._set('google-redirect-uri-host', None)
dictToSend = {'code':code,
'client_id':client_id,
'client_secret':client_secret,
'grant_type':'authorization_code',
'redirect_uri':redirect_uri}
res = requests.post('https://oauth2.googleapis.com/token', data=dictToSend,
headers={'Host': 'oauth2.googleapis.com',
'Content-Type':'application/x-www-form-urlencoded'})
dictFromServer = res.json()
info(f"Google response keys: {dictFromServer.keys()}")
self.user_vars._set('google-access-token', dictFromServer['access_token'])
if 'refresh_token' in dictFromServer:
self.user_vars._set('google-refresh-token', dictFromServer['refresh_token'])
user_details = self._get_user_details_from_google_token(access_token=dictFromServer['access_token'], refresh_token=self.user_vars._get('google-refresh-token'))
self.user_vars._set('google-user-details', user_details)
email = user_details['email']
if username is None:
username = email
if self.email_exists(email):
users_ids = self.get_users_ids_from_data(email=email)
if len(users_ids) != 1:
error(f"More than one user with email {email} exists in the database.")
else:
success = self.signin_user_from_id(users_ids[0])
if not success:
error("Error signing in user")
else:
success = self.signup_user(email=email, username=username, password=None)
if success:
self.signin_user(email=email, username=username, password=None)
else:
error("Error signing up user")
return redirect(after_auth_url)
else:
# Validating scope
for s in scope.split(" "):
if not s.startswith("https://www.googleapis.com/auth/"):
raise ValueError(f"Invalid scope. Scope should start with `https://www.googleapis.com/auth/`. However your scope is `{s}`")
# Creating redirect_to
redirect_to = f"https://accounts.google.com/o/oauth2/v2/auth?" \
f"client_id={client_id}" \
f"&response_type={response_type}" \
f"&scope={scope}" \
f"&redirect_uri={redirect_uri}"
if access_type is not None:
redirect_to += f"&access_type={access_type}"
if state is not None:
redirect_to += f"&state={state}"
if include_granted_scopes is not None:
if include_granted_scopes is True:
redirect_to += f"&include_granted_scopes=true"
elif include_granted_scopes is False:
redirect_to += f"&include_granted_scopes=false"
if enable_granular_consent is not None:
if enable_granular_consent is True:
redirect_to += f"&enable_granular_consent=true"
elif enable_granular_consent is False:
redirect_to += f"&enable_granular_consent=false"
if login_hint is not None:
redirect_to += f"&login_hint={login_hint}"
if prompt is not None:
redirect_to += f"&prompt={' '.join(prompt)}"
return redirect(redirect_to)
class _FuncWithPage:
"""Currently this class is unused, but it might be used later."""
def __init__(self, func, page):
self.func = func
self.page = page
def __call__(self, *args, **kwargs):
return self.func(*args, **kwargs)
class _UserVars(MutableMapping):
"""User-specific variables"""
def __init__(self, app, timeout_interval, gen_sid_algo) -> None:
self._app = app
self._cache = UserDict()
self._cache.set = self._cache.__setitem__
self._default_vars = {}
self._timeout_interval = timeout_interval
if gen_sid_algo:
self._gen_sid = gen_sid_algo
def _gen_sid(self):
try:
if request.cookies.get('TOUI_SID'):
sid = request.cookies.get('TOUI_SID')
session['toui-sid'] = sid
else:
if "toui-sid" in session:
sid = session['toui-sid']
else:
sid = str(uuid.uuid4())
session['toui-sid'] = sid
response = make_response()
response.set_cookie("TOUI_SID", sid, secure=True, httponly=True)
session['toui-response'] = response
return sid
except RuntimeError:
return None
def _timeout(self, sid):
self._cache.delete(sid)
def _sid_check(self):
sid = self._gen_sid()
if sid:
user_dict = self._cache.get(sid)
if user_dict is None:
self._cache.set(sid, {"toui-vars": self._default_vars.copy()})
threading.Timer(self._timeout_interval, self._timeout, args=[sid]).start()
return sid
def _get_toui_vars(self):
sid = self._sid_check()
if sid:
return self._cache.get(sid)['toui-vars']
else:
return self._default_vars
def _get(self, key):
sid = self._sid_check()
if sid:
return self._cache.get(sid).get(key)
else:
return self._default_vars
def _set(self, key, value):
"""Avoid key='toui-vars'"""
sid = self._sid_check()
if sid:
sid_dict = self._cache.get(sid)
sid_dict[key] = value
else:
self._default_vars[key] = value
def _del(self, key):
sid = self._sid_check()
if sid:
sid_dict = self._cache.get(sid)
if key in sid_dict:
del sid_dict[key]
else:
if key in self._default_vars:
del self._default_vars[key]
def __getitem__(self, key):
return self._get_toui_vars()[key]
def __setitem__(self, key, value):
toui_vars = self._get_toui_vars()
toui_vars[key] = value
def __delitem__(self, key: Any) -> None:
toui_vars = self._get_toui_vars()
del toui_vars[key]
def __iter__(self):
for key in self._get_toui_vars():
yield key
def __len__(self) -> int:
return len(self._get_toui_vars())
def __getattr__(self, name: str) -> Any:
return getattr(self._get_toui_vars(), name)
def __repr__(self) -> str:
return repr(self._get_toui_vars())
[docs]class Website(_App):
"""
A class that creates a web application from HTML files.
Examples
--------
Creating a web app:
>>> from toui import Website
>>> app = Website(__name__, secret_key="some key")
Creating a page and adding it to the app:
>>> from toui import Page
>>> home_page = Page(html_str="<h1>This is the home page</h1>")
>>> app.add_pages(home_page)
Running the app:
>>> if __name__ == "__main__":
... app.run(debug=True) # doctest: +SKIP
See Also
--------
:py:class:`toui.pages.Page`
:py:class:`DesktopApp`
"""
@wraps(Flask.run)
def run(self, *args, **kwargs):
"""
Runs the app. It calls the function `flask.Flask.run`. The arguments will be passed to
`flask.Flask.run` function.
Parameters
----------
args: Any
kwargs: Any
"""
self.flask_app.run(*args, **kwargs)
[docs]class DesktopApp(_App):
"""
A class that creates a desktop application from HTML files.
Examples
--------
Creating a desktop app:
>>> from toui import DesktopApp
>>> app = DesktopApp("MyApp")
Creating a page and adding it to the app:
>>> from toui import Page
>>> home_page = Page(html_str="<h1>This is the home page</h1>")
>>> app.add_pages(home_page)
Running the app:
>>> if __name__ == "__main__":
... app.run() # doctest: +SKIP
See Also
--------
:py:class:`toui.pages.Page`
:py:class:`Website`
"""
def _run_server(self):
self.flask_app.run(port=self._port, use_reloader=False)
@wraps(webview.start)
def run(self, *args, **kwargs):
"""
Runs the app. It calls the function `webview.start`. The arguments will be passed to
`webview.start` function.
Parameters
----------
args: Any
kwargs: Any
"""
if len(self.pages) == 0:
raise Exception("Cannot run the app because no pages were added.")
self._port = webview.http._get_random_port()
t = threading.Thread(target=self._run_server)
t.daemon = True
t.start()
self.pages[0]._create_window()
webview.start(*args, **kwargs)
[docs]def quick_website(name="App", html_file=None, html_str=None, url="/", assets_folder=None, secret_key=None):
"""
Creates a web app and adds a single `Page` to it.
Parameters
----------
name: str (optional)
The name of the app.
html_file: str (optional)
The path to the HTML file that will be used to create the `Page`.
html_str: str (optional)
The content of the `Page`.
url: str (optional)
The URL of the `Page`.
assets_folder: str (optional)
The path to the folder that contains the HTML file. If no HTML files are used,
you can ignore this parameter.
secret_key: str (optional)
Sets the `secret_key` attribute for `flask.Flask`
Returns
-------
Website
"""
app = Website(name=name, assets_folder=assets_folder, secret_key=secret_key)
page = Page(url=url, html_file=html_file, html_str=html_str)
app.add_pages(page)
return app
[docs]def quick_desktop_app(name="App", html_file=None, html_str=None, url="/", assets_folder=None, secret_key=None):
"""
Creates a desktop app and adds a single `Page` to it.
Parameters
----------
name: str (optional)
The name of the app.
html_file: str (optional)
The path to the HTML file that will be used to create the `Page`.
html_str: str (optional)
The content of the `Page`.
url: str (optional)
The URL of the `Page`.
assets_folder: str (optional)
The path to the folder that contains the HTML file. If no HTML files are used,
you can ignore this parameter.
Returns
-------
DesktopApp
"""
app = DesktopApp(name=name, assets_folder=assets_folder, secret_key=secret_key)
page = Page(url=url, html_file=html_file, html_str=html_str)
app.add_pages(page)
return app
[docs]def set_global_app(app):
"""
Allows the app object to be shared across Python modules.
Examples
--------
Suppose you have two Python scripts, "main.py" and "home_page.py". In "main.py", you can create the app
and make it global:
>>> from toui import Website, set_global_app
>>> app = Website(__name__)
>>> set_global_app(app)
While in "home_page.py", you can get the shared app:
>>> from toui import get_global_app
>>> app = get_global_app()
"""
global _global_app
_global_app = app
[docs]def get_global_app() -> _App:
"""
Gets the shared app object.
See :py:meth:`set_global_app`.
"""
return _global_app
if __name__ == "__main__":
import doctest
results = doctest.testmod()
print(results)