mirror of
http://git.carcosa.net/jmcbray/brutaldon.git
synced 2024-11-23 07:13:52 -05:00
Move utility functions out of views.py
This commit is contained in:
parent
0c5f79c3f6
commit
223f3149cd
1
.gitignore
vendored
1
.gitignore
vendored
@ -115,4 +115,5 @@ node_modules
|
||||
package-lock.json
|
||||
yarn.lock
|
||||
/db.sqlite3-*
|
||||
*~
|
||||
|
||||
|
215
brutaldon/utils.py
Normal file
215
brutaldon/utils.py
Normal file
@ -0,0 +1,215 @@
|
||||
from requests import Session
|
||||
from django.conf import settings as django_settings
|
||||
from brutaldon.models import Client, Account
|
||||
from mastodon import Mastodon, MastodonAPIError
|
||||
from django.http import HttpResponseRedirect
|
||||
import re
|
||||
|
||||
global sessons_cache
|
||||
sessions_cache = {}
|
||||
|
||||
VISIBILITIES = ["direct", "private", "unlisted", "public"]
|
||||
|
||||
|
||||
class NotLoggedInException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class LabeledList(list):
|
||||
"""A subclass of list that can accept additional attributes"""
|
||||
|
||||
def __new__(self, *args, **kwargs):
|
||||
return super(LabeledList, self).__new__(self, args, kwargs)
|
||||
|
||||
def __init(self, *args, **kwargs):
|
||||
if len(args) == 1 and hasattr(args[0], "__iter__"):
|
||||
list.__init__(self, args[0])
|
||||
else:
|
||||
list.__init__(self, args)
|
||||
self.__dict__.update(kwargs)
|
||||
|
||||
def __call(self, **kwargs):
|
||||
self.__dict__.update(kwargs)
|
||||
return self
|
||||
|
||||
|
||||
###
|
||||
### Utility functions
|
||||
###
|
||||
|
||||
|
||||
def get_session(domain):
|
||||
if domain in sessions_cache:
|
||||
return sessions_cache[domain]
|
||||
else:
|
||||
s = Session()
|
||||
sessions_cache[domain] = s
|
||||
return s
|
||||
|
||||
|
||||
def get_usercontext(request, feature_set="mainline"):
|
||||
if is_logged_in(request):
|
||||
try:
|
||||
client = Client.objects.get(api_base_id=request.session["active_instance"])
|
||||
user = Account.objects.get(username=request.session["active_username"])
|
||||
except (
|
||||
Client.DoesNotExist,
|
||||
Client.MultipleObjectsReturned,
|
||||
Account.DoesNotExist,
|
||||
Account.MultipleObjectsReturned,
|
||||
):
|
||||
raise NotLoggedInException()
|
||||
mastodon = Mastodon(
|
||||
client_id=client.client_id,
|
||||
client_secret=client.client_secret,
|
||||
access_token=user.access_token,
|
||||
api_base_url=client.api_base_id,
|
||||
session=get_session(client.api_base_id),
|
||||
ratelimit_method="throw",
|
||||
feature_set=feature_set,
|
||||
)
|
||||
return user, mastodon
|
||||
else:
|
||||
return None, None
|
||||
|
||||
|
||||
def is_logged_in(request):
|
||||
return request.session.has_key("active_user")
|
||||
|
||||
|
||||
def _notes_count(account, mastodon):
|
||||
if not mastodon:
|
||||
return ""
|
||||
notes = mastodon.notifications(limit=40)
|
||||
if account.preferences.filter_notifications:
|
||||
notes = [
|
||||
note for note in notes if note.type == "mention" or note.type == "follow"
|
||||
]
|
||||
for index, item in enumerate(notes):
|
||||
if account.note_seen is None:
|
||||
account.note_seen = "0"
|
||||
account.save()
|
||||
if str(item.id) <= str(account.note_seen):
|
||||
break
|
||||
else:
|
||||
index = "40+"
|
||||
return str(index)
|
||||
|
||||
|
||||
def br_login_required(function=None, home_url=None, redirect_field_name=None):
|
||||
"""Check that the user is logged in to a Mastodon instance.
|
||||
|
||||
This decorator ensures that the view functions it is called on can be
|
||||
accessed only by logged in users. When an instanceless user accesses
|
||||
such a protected view, they are redirected to the address specified in
|
||||
the field named in `next_field` or, lacking such a value, the URL in
|
||||
`home_url`, or the `ANONYMOUS_HOME_URL` setting.
|
||||
"""
|
||||
if home_url is None:
|
||||
home_url = django_settings.ANONYMOUS_HOME_URL
|
||||
|
||||
def _dec(view_func):
|
||||
def _view(request, *args, **kwargs):
|
||||
def not_logged_in():
|
||||
url = None
|
||||
if redirect_field_name and redirect_field_name in request.REQUEST:
|
||||
url = request.REQUEST[redirect_field_name]
|
||||
if not url:
|
||||
url = home_url
|
||||
if not url:
|
||||
url = "/"
|
||||
return HttpResponseRedirect(url)
|
||||
|
||||
if not is_logged_in(request):
|
||||
return not_logged_in()
|
||||
else:
|
||||
try:
|
||||
return view_func(request, *args, **kwargs)
|
||||
except MastodonAPIError:
|
||||
# mastodon must have expired our session
|
||||
return not_logged_in()
|
||||
|
||||
_view.__name__ = view_func.__name__
|
||||
_view.__dict__ = view_func.__dict__
|
||||
_view.__doc__ = view_func.__doc__
|
||||
|
||||
return _view
|
||||
|
||||
if function is None:
|
||||
return _dec
|
||||
else:
|
||||
return _dec(function)
|
||||
|
||||
|
||||
def min_visibility(visibility1, visibility2):
|
||||
return VISIBILITIES[
|
||||
min(VISIBILITIES.index(visibility1), VISIBILITIES.index(visibility2))
|
||||
]
|
||||
|
||||
|
||||
def get_filters(mastodon, context=None):
|
||||
try:
|
||||
if context:
|
||||
return [ff for ff in mastodon.filters() if context in ff.context]
|
||||
else:
|
||||
return mastodon.filters()
|
||||
except:
|
||||
return []
|
||||
|
||||
|
||||
def toot_matches_filters(toot, filters=[]):
|
||||
if not filters:
|
||||
return False
|
||||
|
||||
def maybe_rewrite_filter(filter):
|
||||
if filter.whole_word:
|
||||
return f"\\b{filter.phrase}\\b"
|
||||
else:
|
||||
return filter.phrase
|
||||
|
||||
phrases = [maybe_rewrite_filter(x) for x in filters]
|
||||
pattern = "|".join(phrases)
|
||||
try:
|
||||
if toot.get("type") in ["reblog", "favourite"]:
|
||||
return re.search(
|
||||
pattern, toot.status.spoiler_text + toot.status.content, re.I
|
||||
)
|
||||
return re.search(pattern, toot.spoiler_text + toot.content, re.I)
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
def switch_accounts(request, new_account):
|
||||
"""Try to switch accounts to the specified account, if it is already in
|
||||
the user's session. Sets up new session variables. Returns boolean success
|
||||
code."""
|
||||
accounts_dict = request.session.get("accounts_dict")
|
||||
if not accounts_dict or not new_account in accounts_dict.keys():
|
||||
return False
|
||||
try:
|
||||
account = Account.objects.get(id=accounts_dict[new_account]["account_id"])
|
||||
if account.username != new_account:
|
||||
return False
|
||||
except Account.DoesNotExist:
|
||||
return False
|
||||
|
||||
request.session["active_username"] = account.username
|
||||
request.session["active_instance"] = account.client.api_base_id
|
||||
account, mastodon = get_usercontext(request)
|
||||
request.session["active_user"] = mastodon.account_verify_credentials()
|
||||
accounts_dict[new_account]["user"] = request.session["active_user"]
|
||||
request.session["accounts_dict"] = accounts_dict
|
||||
return True
|
||||
|
||||
|
||||
def same_username(account, acct, username):
|
||||
if acct == username:
|
||||
return True
|
||||
try:
|
||||
user, host = username.split("@", 1)
|
||||
except ValueError:
|
||||
user, host = username, ""
|
||||
myhost = account.username.split("@", 1)[1]
|
||||
if acct == user and host == myhost:
|
||||
return True
|
||||
return False
|
@ -4,7 +4,6 @@ from django.conf import settings as django_settings
|
||||
from django.shortcuts import render, redirect
|
||||
from django.urls import reverse
|
||||
from django.views.decorators.cache import never_cache, cache_page
|
||||
from django.core.files.uploadhandler import TemporaryFileUploadHandler
|
||||
from django.utils.translation import gettext as _
|
||||
from brutaldon.forms import (
|
||||
LoginForm,
|
||||
@ -21,149 +20,26 @@ from mastodon import (
|
||||
MastodonAPIError,
|
||||
MastodonNotFoundError,
|
||||
)
|
||||
from brutaldon.utils import (
|
||||
NotLoggedInException,
|
||||
LabeledList,
|
||||
get_usercontext,
|
||||
_notes_count,
|
||||
br_login_required,
|
||||
min_visibility,
|
||||
get_filters,
|
||||
switch_accounts,
|
||||
toot_matches_filters,
|
||||
same_username,
|
||||
)
|
||||
|
||||
from urllib import parse
|
||||
from itertools import groupby
|
||||
from inscriptis import get_text
|
||||
from time import sleep
|
||||
from requests import Session
|
||||
import re
|
||||
|
||||
|
||||
class NotLoggedInException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class LabeledList(list):
|
||||
"""A subclass of list that can accept additional attributes"""
|
||||
|
||||
def __new__(self, *args, **kwargs):
|
||||
return super(LabeledList, self).__new__(self, args, kwargs)
|
||||
|
||||
def __init(self, *args, **kwargs):
|
||||
if len(args) == 1 and hasattr(args[0], "__iter__"):
|
||||
list.__init__(self, args[0])
|
||||
else:
|
||||
list.__init__(self, args)
|
||||
self.__dict__.update(kwargs)
|
||||
|
||||
def __call(self, **kwargs):
|
||||
self.__dict__.update(kwargs)
|
||||
return self
|
||||
|
||||
|
||||
global sessons_cache
|
||||
sessions_cache = {}
|
||||
|
||||
VISIBILITIES = ["direct", "private", "unlisted", "public"]
|
||||
|
||||
###
|
||||
### Utility functions
|
||||
###
|
||||
|
||||
|
||||
def get_session(domain):
|
||||
if domain in sessions_cache:
|
||||
return sessions_cache[domain]
|
||||
else:
|
||||
s = Session()
|
||||
sessions_cache[domain] = s
|
||||
return s
|
||||
|
||||
|
||||
def get_usercontext(request, feature_set="mainline"):
|
||||
if is_logged_in(request):
|
||||
try:
|
||||
client = Client.objects.get(api_base_id=request.session["active_instance"])
|
||||
user = Account.objects.get(username=request.session["active_username"])
|
||||
except (
|
||||
Client.DoesNotExist,
|
||||
Client.MultipleObjectsReturned,
|
||||
Account.DoesNotExist,
|
||||
Account.MultipleObjectsReturned,
|
||||
):
|
||||
raise NotLoggedInException()
|
||||
mastodon = Mastodon(
|
||||
client_id=client.client_id,
|
||||
client_secret=client.client_secret,
|
||||
access_token=user.access_token,
|
||||
api_base_url=client.api_base_id,
|
||||
session=get_session(client.api_base_id),
|
||||
ratelimit_method="throw",
|
||||
feature_set=feature_set,
|
||||
)
|
||||
return user, mastodon
|
||||
else:
|
||||
return None, None
|
||||
|
||||
|
||||
def is_logged_in(request):
|
||||
return request.session.has_key("active_user")
|
||||
|
||||
|
||||
def _notes_count(account, mastodon):
|
||||
if not mastodon:
|
||||
return ""
|
||||
notes = mastodon.notifications(limit=40)
|
||||
if account.preferences.filter_notifications:
|
||||
notes = [
|
||||
note for note in notes if note.type == "mention" or note.type == "follow"
|
||||
]
|
||||
for index, item in enumerate(notes):
|
||||
if account.note_seen is None:
|
||||
account.note_seen = "0"
|
||||
account.save()
|
||||
if str(item.id) <= str(account.note_seen):
|
||||
break
|
||||
else:
|
||||
index = "40+"
|
||||
return str(index)
|
||||
|
||||
|
||||
def br_login_required(function=None, home_url=None, redirect_field_name=None):
|
||||
"""Check that the user is logged in to a Mastodon instance.
|
||||
|
||||
This decorator ensures that the view functions it is called on can be
|
||||
accessed only by logged in users. When an instanceless user accesses
|
||||
such a protected view, they are redirected to the address specified in
|
||||
the field named in `next_field` or, lacking such a value, the URL in
|
||||
`home_url`, or the `ANONYMOUS_HOME_URL` setting.
|
||||
"""
|
||||
if home_url is None:
|
||||
home_url = django_settings.ANONYMOUS_HOME_URL
|
||||
|
||||
def _dec(view_func):
|
||||
def _view(request, *args, **kwargs):
|
||||
def not_logged_in():
|
||||
url = None
|
||||
if redirect_field_name and redirect_field_name in request.REQUEST:
|
||||
url = request.REQUEST[redirect_field_name]
|
||||
if not url:
|
||||
url = home_url
|
||||
if not url:
|
||||
url = "/"
|
||||
return HttpResponseRedirect(url)
|
||||
|
||||
if not is_logged_in(request):
|
||||
return not_logged_in()
|
||||
else:
|
||||
try:
|
||||
return view_func(request, *args, **kwargs)
|
||||
except MastodonAPIError:
|
||||
# mastodon must have expired our session
|
||||
return not_logged_in()
|
||||
|
||||
_view.__name__ = view_func.__name__
|
||||
_view.__dict__ = view_func.__dict__
|
||||
_view.__doc__ = view_func.__doc__
|
||||
|
||||
return _view
|
||||
|
||||
if function is None:
|
||||
return _dec
|
||||
else:
|
||||
return _dec(function)
|
||||
|
||||
|
||||
def user_search(request):
|
||||
check = request.POST.get("status", "").split()
|
||||
if len(check):
|
||||
@ -191,12 +67,6 @@ def user_search_inner(request, query):
|
||||
)
|
||||
|
||||
|
||||
def min_visibility(visibility1, visibility2):
|
||||
return VISIBILITIES[
|
||||
min(VISIBILITIES.index(visibility1), VISIBILITIES.index(visibility2))
|
||||
]
|
||||
|
||||
|
||||
def timeline(
|
||||
request,
|
||||
timeline="home",
|
||||
@ -253,61 +123,6 @@ def timeline(
|
||||
)
|
||||
|
||||
|
||||
def get_filters(mastodon, context=None):
|
||||
try:
|
||||
if context:
|
||||
return [ff for ff in mastodon.filters() if context in ff.context]
|
||||
else:
|
||||
return mastodon.filters()
|
||||
except:
|
||||
return []
|
||||
|
||||
|
||||
def toot_matches_filters(toot, filters=[]):
|
||||
if not filters:
|
||||
return False
|
||||
|
||||
def maybe_rewrite_filter(filter):
|
||||
if filter.whole_word:
|
||||
return f"\\b{filter.phrase}\\b"
|
||||
else:
|
||||
return filter.phrase
|
||||
|
||||
phrases = [maybe_rewrite_filter(x) for x in filters]
|
||||
pattern = "|".join(phrases)
|
||||
try:
|
||||
if toot.get("type") in ["reblog", "favourite"]:
|
||||
return re.search(
|
||||
pattern, toot.status.spoiler_text + toot.status.content, re.I
|
||||
)
|
||||
return re.search(pattern, toot.spoiler_text + toot.content, re.I)
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
def switch_accounts(request, new_account):
|
||||
"""Try to switch accounts to the specified account, if it is already in
|
||||
the user's session. Sets up new session variables. Returns boolean success
|
||||
code."""
|
||||
accounts_dict = request.session.get("accounts_dict")
|
||||
if not accounts_dict or not new_account in accounts_dict.keys():
|
||||
return False
|
||||
try:
|
||||
account = Account.objects.get(id=accounts_dict[new_account]["account_id"])
|
||||
if account.username != new_account:
|
||||
return False
|
||||
except Account.DoesNotExist:
|
||||
return False
|
||||
|
||||
request.session["active_username"] = account.username
|
||||
request.session["active_instance"] = account.client.api_base_id
|
||||
account, mastodon = get_usercontext(request)
|
||||
request.session["active_user"] = mastodon.account_verify_credentials()
|
||||
accounts_dict[new_account]["user"] = request.session["active_user"]
|
||||
request.session["accounts_dict"] = accounts_dict
|
||||
return True
|
||||
|
||||
|
||||
def forget_account(request, account_name):
|
||||
"""Forget that you were logged into an account. If it's the last one, log out
|
||||
entirely. Sets up session variables. Returns a redirect to the correct
|
||||
@ -718,19 +533,6 @@ def thread(request, id):
|
||||
)
|
||||
|
||||
|
||||
def same_username(account, acct, username):
|
||||
if acct == username:
|
||||
return True
|
||||
try:
|
||||
user, host = username.split("@", 1)
|
||||
except ValueError:
|
||||
user, host = username, ""
|
||||
myhost = account.username.split("@", 1)[1]
|
||||
if acct == user and host == myhost:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
@br_login_required
|
||||
def user(request, username, prev=None, next=None):
|
||||
try:
|
||||
|
Loading…
Reference in New Issue
Block a user