"""Utilities for making it easier to write tests."""
# pylint: disable=unused-import
import datetime as dt
import json
import logging
import os
import re
import socket
from itertools import count
from typing import Any, Iterable, List, Optional, Tuple
from django.contrib.auth.models import Group, User
from django.db import models
from django.http import HttpResponse, JsonResponse
from django.test import TestCase
from esi.models import Scope, Token
from allianceauth.authentication.models import CharacterOwnership, State
from allianceauth.eveonline.models import (
EveAllianceInfo,
EveCharacter,
EveCorporationInfo,
EveFactionInfo,
)
from allianceauth.groupmanagement.models import AuthGroup
from allianceauth.tests.auth_utils import AuthUtils
from .datetime import dt_eveformat
from .esi_testing import BravadoOperationStub, BravadoResponseStub # noqa: F401
from .helpers import random_string
[docs]
def generate_invalid_pk(MyModel: Any) -> int:
"""return an invalid PK for the given Django model"""
pk_max = MyModel.objects.aggregate(models.Max("pk"))["pk__max"]
return pk_max + 1 if pk_max else 1
[docs]
class SocketAccessError(Exception):
"""Error raised when a test script accesses the network"""
[docs]
class NoSocketsTestCase(TestCase):
"""Variation of Django's TestCase class that prevents any network use.
Example:
.. code-block:: python
class TestMyStuff(NoSocketsTestCase):
def test_should_do_what_i_need(self):
...
"""
[docs]
@classmethod
def setUpClass(cls):
cls.socket_original = socket.socket
socket.socket = cls.guard
return super().setUpClass()
[docs]
@classmethod
def tearDownClass(cls):
socket.socket = cls.socket_original
return super().tearDownClass()
[docs]
@staticmethod
def guard(*args, **kwargs):
""":meta_private:"""
raise SocketAccessError("Attempted to access network")
[docs]
def set_test_logger(logger_name: str, name: str) -> object:
"""set logger for current test module
Args:
logger: current logger object
name: name of current module, e.g. __file__
Returns:
amended logger
"""
# reconfigure logger so we get logging from tested module
f_format = logging.Formatter(
"%(asctime)s - %(levelname)s - %(module)s:%(funcName)s - %(message)s"
)
file_name = os.path.splitext(name)[0]
f_handler = logging.FileHandler(f"{file_name}.log", "w+")
f_handler.setFormatter(f_format)
my_logger = logging.getLogger(logger_name)
my_logger.level = logging.DEBUG
my_logger.addHandler(f_handler)
my_logger.propagate = False
return my_logger
[docs]
def queryset_pks(queryset) -> set:
"""shortcut that returns the pks of the given queryset as set.
Useful for comparing test results.
"""
return set(queryset.values_list("pk", flat=True))
[docs]
def response_text(response: HttpResponse) -> str:
"""Return content of a HTTP response as string."""
return response.content.decode("utf-8")
[docs]
def json_response_to_python(response: JsonResponse) -> Any:
"""Convert JSON response into Python object."""
return json.loads(response_text(response))
[docs]
def json_response_to_dict(response: JsonResponse, key="id") -> dict:
"""Convert JSON response into dict by given key."""
return {x[key]: x for x in json_response_to_python(response)}
[docs]
def multi_assert_in(items: Iterable, container: Iterable) -> bool:
"""Return True if all items are in container."""
for item in items:
if item not in container:
return False
return True
[docs]
def multi_assert_not_in(items: Iterable, container: Iterable) -> bool:
"""Return True if none of the item is in container."""
for item in items:
if item in container:
return False
return True
# factories
[docs]
def add_new_token(
user: User,
character: EveCharacter,
scopes: Optional[List[str]] = None,
owner_hash: Optional[str] = None,
) -> Token:
"""Generate a new token for a user based on a character."""
return _store_as_Token(
_generate_token(
character_id=character.character_id,
character_name=character.character_name,
owner_hash=owner_hash,
scopes=scopes,
),
user,
)
def _generate_token(
character_id: int,
character_name: str,
owner_hash: Optional[str] = None,
access_token: str = "access_token",
refresh_token: str = "refresh_token",
scopes: Optional[list] = None,
timestamp_dt: Optional[dt.datetime] = None,
expires_in: int = 1200,
) -> dict:
"""Generates the input to create a new SSO test token"""
if timestamp_dt is None:
timestamp_dt = dt.datetime.utcnow()
if scopes is None:
scopes = [
"esi-mail.read_mail.v1",
"esi-wallet.read_character_wallet.v1",
"esi-universe.read_structures.v1",
]
if owner_hash is None:
owner_hash = random_string(28)
token = {
"access_token": access_token,
"token_type": "Bearer",
"expires_in": expires_in,
"refresh_token": refresh_token,
"timestamp": int(timestamp_dt.timestamp()),
"CharacterID": character_id,
"CharacterName": character_name,
"ExpiresOn": dt_eveformat(timestamp_dt + dt.timedelta(seconds=expires_in)),
"Scopes": " ".join(list(scopes)),
"TokenType": "Character",
"CharacterOwnerHash": owner_hash,
"IntellectualProperty": "EVE",
}
return token
def _store_as_Token(token: dict, user: object) -> Token:
"""Stores a generated token dict as Token object for given user
returns Token object
"""
character_tokens = user.token_set.filter(character_id=token["CharacterID"])
if character_tokens.exists():
token["CharacterOwnerHash"] = character_tokens.first().character_owner_hash
obj = Token.objects.create(
access_token=token["access_token"],
refresh_token=token["refresh_token"],
user=user,
character_id=token["CharacterID"],
character_name=token["CharacterName"],
token_type=token["TokenType"],
character_owner_hash=token["CharacterOwnerHash"],
)
for scope_name in token["Scopes"].split(" "):
scope, _ = Scope.objects.get_or_create(name=scope_name)
obj.scopes.add(scope)
return obj
[docs]
def create_user_from_evecharacter(
character_id: int,
permissions: Optional[List[str]] = None,
scopes: Optional[List[str]] = None,
) -> Tuple[User, CharacterOwnership]:
"""Create new allianceauth user from EveCharacter object.
Args:
character_id: ID of eve character
permissions: list of permission names, e.g. `"my_app.my_permission"`
scopes: list of scope names
"""
auth_character = EveCharacter.objects.get(character_id=character_id)
user = AuthUtils.create_user(auth_character.character_name.replace(" ", "_"))
character_ownership = add_character_to_user(
user, auth_character, is_main=True, scopes=scopes
)
if permissions:
for permission_name in permissions:
user = AuthUtils.add_permission_to_user_by_name(permission_name, user)
return user, character_ownership
[docs]
def add_character_to_user(
user: User,
character: EveCharacter,
is_main: bool = False,
scopes: Optional[List[str]] = None,
disconnect_signals: bool = False,
) -> CharacterOwnership:
"""Generates a token for the given Eve character and makes the given user it's owner
Args:
user: New character owner
character: Character to add
is_main: Will set character as the users's main when True
scopes: List of scopes for the token
disconnect_signals: Will disconnect signals temporarily when True
"""
if not scopes:
scopes = ["publicData"]
if disconnect_signals:
AuthUtils.disconnect_signals()
add_new_token(user, character, scopes)
if is_main:
user.profile.main_character = character
user.profile.save()
user.save()
if disconnect_signals:
AuthUtils.connect_signals()
return CharacterOwnership.objects.get(user=user, character=character)
[docs]
def add_character_to_user_2(
user: User,
character_id,
character_name,
corporation_id,
corporation_name,
alliance_id=None,
alliance_name=None,
disconnect_signals=False,
) -> EveCharacter:
"""Creates a new EVE character and makes the given user the owner"""
defaults = {
"character_name": str(character_name),
"corporation_id": int(corporation_id),
"corporation_name": str(corporation_name),
}
if alliance_id:
defaults["alliance_id"] = int(alliance_id)
defaults["alliance_name"] = str(alliance_name)
if disconnect_signals:
AuthUtils.disconnect_signals()
character, _ = EveCharacter.objects.update_or_create(
character_id=character_id, defaults=defaults
)
CharacterOwnership.objects.create(
character=character, owner_hash=f"{character_id}_{character_name}", user=user
)
if disconnect_signals:
AuthUtils.connect_signals()
return character
[docs]
def create_fake_user(
character_id: int,
character_name: str,
corporation_id: Optional[int] = None,
corporation_name: Optional[str] = None,
corporation_ticker: Optional[str] = None,
alliance_id: Optional[int] = None,
alliance_name: Optional[str] = None,
permissions: Optional[List[str]] = None,
) -> User:
"""Create a fake user incl. main character and (optional) permissions.
Will use default corporation and alliance if not set.
"""
username = re.sub(r"[^\w\d@\.\+-]", "_", character_name)
user = AuthUtils.create_user(username)
if not corporation_id:
corporation_id = 2001
corporation_name = "Wayne Technologies Inc."
corporation_ticker = "WTE"
if corporation_id == 2001:
alliance_id = 3001
alliance_name = "Wayne Enterprises"
AuthUtils.add_main_character_2(
user=user,
name=character_name,
character_id=character_id,
corp_id=corporation_id,
corp_name=corporation_name,
corp_ticker=corporation_ticker,
alliance_id=alliance_id,
alliance_name=alliance_name,
)
if permissions:
perm_objs = [AuthUtils.get_permission_by_name(perm) for perm in permissions]
user = AuthUtils.add_permissions_to_user(perms=perm_objs, user=user)
return user
[docs]
def create_authgroup(states: Optional[Iterable[State]] = None, **kwargs) -> Group:
"""Create Group object with additional Auth related properties for tests."""
if "name" not in kwargs:
kwargs["name"] = f"Test Group #{next_number('authgroup')}"
name = kwargs.pop("name")
group = Group.objects.create(name=name)
if states:
group.authgroup.states.add(*states)
if kwargs:
AuthGroup.objects.filter(group=group).update(**kwargs)
group.authgroup.refresh_from_db()
return group
[docs]
def create_state(
priority: int,
permissions: Optional[Iterable[str]] = None,
member_characters: Optional[Iterable[EveCharacter]] = None,
member_corporations: Optional[Iterable[EveCorporationInfo]] = None,
member_alliances: Optional[Iterable[EveAllianceInfo]] = None,
member_factions: Optional[Iterable[EveFactionInfo]] = None,
**kwargs,
) -> State:
"""Create a State object for tests."""
params = {"priority": priority, "name": f"Test State #{next_number('state_name')}"}
params.update(kwargs)
obj = State.objects.create(**params)
if permissions:
perm_objs = [AuthUtils.get_permission_by_name(perm) for perm in permissions]
obj.permissions.add(*perm_objs)
if member_characters:
obj.member_characters.add(*member_characters)
if member_corporations:
obj.member_corporations.add(*member_corporations)
if member_alliances:
obj.member_alliances.add(*member_alliances)
if member_factions:
obj.member_factions.add(*member_factions)
return obj
[docs]
def create_eve_character(
character_id: int, character_name: str, **kwargs
) -> EveCharacter:
"""Create an EveCharacter object for tests."""
params = {
"character_id": character_id,
"character_name": character_name,
"corporation_id": 2001,
"corporation_name": "Wayne Technologies",
"corporation_ticker": "WYT",
"alliance_id": 3001,
"alliance_name": "Wayne Enterprises",
"alliance_ticker": "WYE",
}
params.update(kwargs)
return EveCharacter.objects.create(**params)
[docs]
def next_number(key=None) -> int:
"""Generate consecutive numbers. Optionally numbers are generates for given key."""
if key is None:
key = "_general"
try:
return next(next_number._counter[key])
except AttributeError:
next_number._counter = {}
except KeyError:
pass
next_number._counter[key] = count(start=1)
return next(next_number._counter[key])