server/users: add editing users
This commit is contained in:
parent
8bdcb04665
commit
1ed17a2046
5 changed files with 228 additions and 10 deletions
|
@ -1,9 +1,11 @@
|
||||||
''' Exports UserListApi and UserDetailApi. '''
|
''' Exports UserListApi and UserDetailApi. '''
|
||||||
|
|
||||||
|
import re
|
||||||
import sqlalchemy
|
import sqlalchemy
|
||||||
from szurubooru.api.base_api import BaseApi
|
from szurubooru.api.base_api import BaseApi
|
||||||
from szurubooru.errors import IntegrityError, ValidationError, NotFoundError
|
from szurubooru.errors import IntegrityError, ValidationError, NotFoundError, AuthError
|
||||||
from szurubooru.services.search import UserSearchConfig, SearchExecutor
|
from szurubooru.services.search import UserSearchConfig, SearchExecutor
|
||||||
|
from szurubooru.util import is_valid_email
|
||||||
|
|
||||||
def _serialize_user(authenticated_user, user):
|
def _serialize_user(authenticated_user, user):
|
||||||
ret = {
|
ret = {
|
||||||
|
@ -45,7 +47,7 @@ class UserListApi(BaseApi):
|
||||||
self._auth_service.verify_privilege(context.user, 'users:create')
|
self._auth_service.verify_privilege(context.user, 'users:create')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
name = context.request['name']
|
name = context.request['name'].strip()
|
||||||
password = context.request['password']
|
password = context.request['password']
|
||||||
email = context.request['email'].strip()
|
email = context.request['email'].strip()
|
||||||
except KeyError as ex:
|
except KeyError as ex:
|
||||||
|
@ -61,8 +63,12 @@ class UserListApi(BaseApi):
|
||||||
|
|
||||||
class UserDetailApi(BaseApi):
|
class UserDetailApi(BaseApi):
|
||||||
''' API for individual users. '''
|
''' API for individual users. '''
|
||||||
def __init__(self, auth_service, user_service):
|
def __init__(self, config, auth_service, password_service, user_service):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
self._available_access_ranks = config['service']['user_ranks']
|
||||||
|
self._name_regex = config['service']['user_name_regex']
|
||||||
|
self._password_regex = config['service']['password_regex']
|
||||||
|
self._password_service = password_service
|
||||||
self._auth_service = auth_service
|
self._auth_service = auth_service
|
||||||
self._user_service = user_service
|
self._user_service = user_service
|
||||||
|
|
||||||
|
@ -76,5 +82,65 @@ class UserDetailApi(BaseApi):
|
||||||
|
|
||||||
def put(self, request, context, user_name):
|
def put(self, request, context, user_name):
|
||||||
''' Updates an existing user. '''
|
''' Updates an existing user. '''
|
||||||
self._auth_service.verify_privilege(context.user, 'users:edit')
|
user = self._user_service.get_by_name(context.session, user_name)
|
||||||
return {'message': 'Updating user ' + user_name}
|
if not user:
|
||||||
|
raise NotFoundError('User %r not found.' % user_name)
|
||||||
|
|
||||||
|
if context.user.user_id == user.user_id:
|
||||||
|
infix = 'self'
|
||||||
|
else:
|
||||||
|
infix = 'any'
|
||||||
|
|
||||||
|
if 'name' in context.request:
|
||||||
|
self._auth_service.verify_privilege(
|
||||||
|
context.user, 'users:edit:%s:name' % infix)
|
||||||
|
name = context.request['name'].strip()
|
||||||
|
if not re.match(self._name_regex, name):
|
||||||
|
raise ValidationError(
|
||||||
|
'Name must satisfy regex %r.' % self._name_regex)
|
||||||
|
user.name = name
|
||||||
|
|
||||||
|
if 'password' in context.request:
|
||||||
|
password = context.request['password']
|
||||||
|
self._auth_service.verify_privilege(
|
||||||
|
context.user, 'users:edit:%s:pass' % infix)
|
||||||
|
if not re.match(self._password_regex, password):
|
||||||
|
raise ValidationError(
|
||||||
|
'Password must satisfy regex %r.' % self._password_regex)
|
||||||
|
user.password_salt = self._password_service.create_password()
|
||||||
|
user.password_hash = self._password_service.get_password_hash(
|
||||||
|
user.password_salt, password)
|
||||||
|
|
||||||
|
if 'email' in context.request:
|
||||||
|
self._auth_service.verify_privilege(
|
||||||
|
context.user, 'users:edit:%s:email' % infix)
|
||||||
|
email = context.request['email'].strip()
|
||||||
|
if not is_valid_email(email):
|
||||||
|
raise ValidationError('%r is not a vaild email address.' % email)
|
||||||
|
# prefer nulls to empty strings in the DB
|
||||||
|
if not email:
|
||||||
|
email = None
|
||||||
|
user.email = email
|
||||||
|
|
||||||
|
if 'accessRank' in context.request:
|
||||||
|
self._auth_service.verify_privilege(
|
||||||
|
context.user, 'users:edit:%s:rank' % infix)
|
||||||
|
rank = context.request['accessRank'].strip()
|
||||||
|
if not rank in self._available_access_ranks:
|
||||||
|
raise ValidationError(
|
||||||
|
'Bad access rank. Valid access ranks: %r' \
|
||||||
|
% self._available_access_ranks)
|
||||||
|
if self._available_access_ranks.index(context.user.access_rank) \
|
||||||
|
< self._available_access_ranks.index(rank):
|
||||||
|
raise AuthError(
|
||||||
|
'Trying to set higher access rank than one has')
|
||||||
|
user.access_rank = rank
|
||||||
|
|
||||||
|
# TODO: avatar
|
||||||
|
|
||||||
|
try:
|
||||||
|
context.session.commit()
|
||||||
|
except sqlalchemy.exc.IntegrityError:
|
||||||
|
raise IntegrityError('User %r already exists.' % name)
|
||||||
|
|
||||||
|
return {'user': _serialize_user(context.user, user)}
|
||||||
|
|
|
@ -67,8 +67,9 @@ def create_app():
|
||||||
auth_service = szurubooru.services.AuthService(config, password_service)
|
auth_service = szurubooru.services.AuthService(config, password_service)
|
||||||
user_service = szurubooru.services.UserService(config, password_service)
|
user_service = szurubooru.services.UserService(config, password_service)
|
||||||
|
|
||||||
user_list = szurubooru.api.UserListApi(auth_service, user_service)
|
user_list_api = szurubooru.api.UserListApi(auth_service, user_service)
|
||||||
user = szurubooru.api.UserDetailApi(auth_service, user_service)
|
user_detail_api = szurubooru.api.UserDetailApi(
|
||||||
|
config, auth_service, password_service, user_service)
|
||||||
|
|
||||||
app = falcon.API(
|
app = falcon.API(
|
||||||
request_type=_CustomRequest,
|
request_type=_CustomRequest,
|
||||||
|
@ -85,7 +86,7 @@ def create_app():
|
||||||
app.add_error_handler(szurubooru.errors.SearchError, _on_search_error)
|
app.add_error_handler(szurubooru.errors.SearchError, _on_search_error)
|
||||||
app.add_error_handler(szurubooru.errors.NotFoundError, _on_not_found_error)
|
app.add_error_handler(szurubooru.errors.NotFoundError, _on_not_found_error)
|
||||||
|
|
||||||
app.add_route('/users/', user_list)
|
app.add_route('/users/', user_list_api)
|
||||||
app.add_route('/user/{user_name}', user)
|
app.add_route('/user/{user_name}', user_detail_api)
|
||||||
|
|
||||||
return app
|
return app
|
||||||
|
|
|
@ -4,6 +4,7 @@ import re
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from szurubooru.errors import ValidationError
|
from szurubooru.errors import ValidationError
|
||||||
from szurubooru.model.user import User
|
from szurubooru.model.user import User
|
||||||
|
from szurubooru.util import is_valid_email
|
||||||
|
|
||||||
class UserService(object):
|
class UserService(object):
|
||||||
''' User management '''
|
''' User management '''
|
||||||
|
@ -25,6 +26,9 @@ class UserService(object):
|
||||||
raise ValidationError(
|
raise ValidationError(
|
||||||
'Password must satisfy regex %r.' % self._password_regex)
|
'Password must satisfy regex %r.' % self._password_regex)
|
||||||
|
|
||||||
|
if not is_valid_email(email):
|
||||||
|
raise ValidationError('%r is not a vaild email address.' % email)
|
||||||
|
|
||||||
# prefer nulls to empty strings in the DB
|
# prefer nulls to empty strings in the DB
|
||||||
if not email:
|
if not email:
|
||||||
email = None
|
email = None
|
||||||
|
|
143
server/szurubooru/tests/api/test_updating_user.py
Normal file
143
server/szurubooru/tests/api/test_updating_user.py
Normal file
|
@ -0,0 +1,143 @@
|
||||||
|
from datetime import datetime
|
||||||
|
import szurubooru.services
|
||||||
|
from szurubooru.api.user_api import UserDetailApi
|
||||||
|
from szurubooru.errors import AuthError, ValidationError
|
||||||
|
from szurubooru.model.user import User
|
||||||
|
from szurubooru.tests.database_test_case import DatabaseTestCase
|
||||||
|
from szurubooru.util import dotdict
|
||||||
|
|
||||||
|
class TestUserDetailApi(DatabaseTestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super().setUp()
|
||||||
|
config = {
|
||||||
|
'basic': {
|
||||||
|
'secret': '',
|
||||||
|
},
|
||||||
|
'service': {
|
||||||
|
'user_name_regex': '.{3,}',
|
||||||
|
'password_regex': '.{3,}',
|
||||||
|
'user_ranks': ['anonymous', 'regular_user', 'mod', 'admin'],
|
||||||
|
},
|
||||||
|
'privileges': {
|
||||||
|
'users:edit:self:name': 'regular_user',
|
||||||
|
'users:edit:self:pass': 'regular_user',
|
||||||
|
'users:edit:self:email': 'regular_user',
|
||||||
|
'users:edit:self:rank': 'mod',
|
||||||
|
|
||||||
|
'users:edit:any:name': 'mod',
|
||||||
|
'users:edit:any:pass': 'mod',
|
||||||
|
'users:edit:any:email': 'mod',
|
||||||
|
'users:edit:any:rank': 'admin',
|
||||||
|
}
|
||||||
|
}
|
||||||
|
password_service = szurubooru.services.PasswordService(config)
|
||||||
|
auth_service = szurubooru.services.AuthService(config, password_service)
|
||||||
|
user_service = szurubooru.services.UserService(config, password_service)
|
||||||
|
self.auth_service = auth_service
|
||||||
|
self.api = UserDetailApi(
|
||||||
|
config, auth_service, password_service, user_service)
|
||||||
|
self.context = dotdict()
|
||||||
|
self.context.session = self.session
|
||||||
|
self.context.request = {}
|
||||||
|
self.request = dotdict()
|
||||||
|
self.request.context = self.context
|
||||||
|
|
||||||
|
def _create_user(self, name, rank='admin'):
|
||||||
|
user = User()
|
||||||
|
user.name = name
|
||||||
|
user.password = 'dummy'
|
||||||
|
user.password_salt = 'dummy'
|
||||||
|
user.password_hash = 'dummy'
|
||||||
|
user.email = 'dummy'
|
||||||
|
user.access_rank = rank
|
||||||
|
user.creation_time = datetime.now()
|
||||||
|
user.avatar_style = User.AVATAR_GRAVATAR
|
||||||
|
return user
|
||||||
|
|
||||||
|
def test_updating_nothing(self):
|
||||||
|
admin_user = self._create_user('u1', 'admin')
|
||||||
|
self.session.add(admin_user)
|
||||||
|
self.context.user = admin_user
|
||||||
|
self.api.put(self.request, self.context, 'u1')
|
||||||
|
admin_user = self.session.query(User).filter_by(name='u1').one()
|
||||||
|
self.assertEqual(admin_user.name, 'u1')
|
||||||
|
self.assertEqual(admin_user.email, 'dummy')
|
||||||
|
self.assertEqual(admin_user.access_rank, 'admin')
|
||||||
|
|
||||||
|
def test_admin_updating_everything_for_themselves(self):
|
||||||
|
admin_user = self._create_user('u1', 'admin')
|
||||||
|
self.session.add(admin_user)
|
||||||
|
self.context.user = admin_user
|
||||||
|
self.context.request = {
|
||||||
|
'name': 'chewie',
|
||||||
|
'email': 'asd@asd.asd',
|
||||||
|
'password': 'valid',
|
||||||
|
'accessRank': 'mod',
|
||||||
|
}
|
||||||
|
self.api.put(self.request, self.context, 'u1')
|
||||||
|
admin_user = self.session.query(User).filter_by(name='chewie').one()
|
||||||
|
self.assertEqual(admin_user.name, 'chewie')
|
||||||
|
self.assertEqual(admin_user.email, 'asd@asd.asd')
|
||||||
|
self.assertEqual(admin_user.access_rank, 'mod')
|
||||||
|
self.assertTrue(self.auth_service.is_valid_password(admin_user, 'valid'))
|
||||||
|
self.assertFalse(self.auth_service.is_valid_password(admin_user, 'invalid'))
|
||||||
|
|
||||||
|
def test_removing_email(self):
|
||||||
|
admin_user = self._create_user('u1', 'admin')
|
||||||
|
self.session.add(admin_user)
|
||||||
|
self.context.user = admin_user
|
||||||
|
self.context.request = {'email': ''}
|
||||||
|
self.api.put(self.request, self.context, 'u1')
|
||||||
|
admin_user = self.session.query(User).filter_by(name='u1').one()
|
||||||
|
self.assertEqual(admin_user.email, None)
|
||||||
|
|
||||||
|
def test_invalid_inputs(self):
|
||||||
|
admin_user = self._create_user('u1', 'admin')
|
||||||
|
self.session.add(admin_user)
|
||||||
|
self.context.user = admin_user
|
||||||
|
self.context.request = {'name': '.'}
|
||||||
|
self.assertRaises(
|
||||||
|
ValidationError, self.api.put, self.request, self.context, 'u1')
|
||||||
|
self.context.request = {'password': '.'}
|
||||||
|
self.assertRaises(
|
||||||
|
ValidationError, self.api.put, self.request, self.context, 'u1')
|
||||||
|
self.context.request = {'accessRank': '.'}
|
||||||
|
self.assertRaises(
|
||||||
|
ValidationError, self.api.put, self.request, self.context, 'u1')
|
||||||
|
self.context.request = {'email': '.'}
|
||||||
|
self.assertRaises(
|
||||||
|
ValidationError, self.api.put, self.request, self.context, 'u1')
|
||||||
|
|
||||||
|
def test_user_trying_to_update_someone_else(self):
|
||||||
|
user1 = self._create_user('u1', 'regular_user')
|
||||||
|
user2 = self._create_user('u2', 'regular_user')
|
||||||
|
self.session.add_all([user1, user2])
|
||||||
|
self.context.user = user1
|
||||||
|
for request in [
|
||||||
|
{'name': 'whatever'},
|
||||||
|
{'email': 'whatever'},
|
||||||
|
{'accessRank': 'whatever'},
|
||||||
|
{'password': 'whatever'}]:
|
||||||
|
self.context.request = request
|
||||||
|
self.assertRaises(
|
||||||
|
AuthError, self.api.put, self.request, self.context, user2.name)
|
||||||
|
|
||||||
|
def test_user_trying_to_become_someone_else(self):
|
||||||
|
user1 = self._create_user('u1', 'regular_user')
|
||||||
|
user2 = self._create_user('u2', 'regular_user')
|
||||||
|
self.session.add_all([user1, user2])
|
||||||
|
self.context.user = user1
|
||||||
|
self.context.request = {'name': 'u2'}
|
||||||
|
self.assertRaises(
|
||||||
|
ValidationError, self.api.put, self.request, self.context, 'u1')
|
||||||
|
|
||||||
|
def test_mods_trying_to_become_admin(self):
|
||||||
|
user1 = self._create_user('u1', 'mod')
|
||||||
|
user2 = self._create_user('u2', 'mod')
|
||||||
|
self.session.add_all([user1, user2])
|
||||||
|
self.context.user = user1
|
||||||
|
self.context.request = {'accessRank': 'admin'}
|
||||||
|
self.assertRaises(
|
||||||
|
AuthError, self.api.put, self.request, self.context, user1.name)
|
||||||
|
self.assertRaises(
|
||||||
|
AuthError, self.api.put, self.request, self.context, user2.name)
|
|
@ -4,8 +4,12 @@ import datetime
|
||||||
import re
|
import re
|
||||||
from szurubooru.errors import ValidationError
|
from szurubooru.errors import ValidationError
|
||||||
|
|
||||||
|
def is_valid_email(email):
|
||||||
|
''' Validates given email address. '''
|
||||||
|
return not email or re.match('^[^@]*@[^@]*\.[^@]*$', email)
|
||||||
|
|
||||||
class dotdict(dict): # pylint: disable=invalid-name
|
class dotdict(dict): # pylint: disable=invalid-name
|
||||||
'''dot.notation access to dictionary attributes'''
|
''' dot.notation access to dictionary attributes. '''
|
||||||
def __getattr__(self, attr):
|
def __getattr__(self, attr):
|
||||||
return self.get(attr)
|
return self.get(attr)
|
||||||
__setattr__ = dict.__setitem__
|
__setattr__ = dict.__setitem__
|
||||||
|
|
Loading…
Reference in a new issue