Stashr
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
stashr/stashr/routes.py

718 lines
20 KiB

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Stashr - Flask Routing Definitions
"""
"""
MIT License
Copyright (c) 2020 Andrew Vanderbye
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
"""-------------------------------------------------------------------------------------------
-- IMPORTS
-------------------------------------------------------------------------------------------"""
""" --- HUEY IMPORT --- """
""" --- PYTHON IMPORTS --- """
import math, os, json
from natsort import natsorted
""" --- STASHR DEPENDENCY IMPORTS --- """
""" --- STASHR CORE IMPORTS --- """
from stashr import log, database, forms, config, utils, folders, stashr
from stashr.stashr import app, pm
# from stashr.stashr import permission_admin, permission_reader
from stashr.config import stashrconfig
""" --- FLASK IMPORTS --- """
from flask import flash, redirect, url_for, render_template, current_app, send_from_directory, request, send_file, jsonify
""" --- FLASK EXTENSION IMPORTS --- """
from flask_login import login_user, current_user, login_required, logout_user
from flask_bcrypt import check_password_hash, generate_password_hash
# from flask_principal import RoleNeed, Identity, AnonymousIdentity, identity_changed, identity_loaded
from sqlalchemy import or_
""" --- CREATE LOGGER --- """
logger = log.stashr_logger(__name__)
"""-------------------------------------------------------------------------------------------
-- FLASK-PRINCIPAL IDENTITY LOADING
-------------------------------------------------------------------------------------------"""
"""
@identity_loaded.connect_via(app)
def on_identity_loaded(sender, identity):
try:
if identity.id is not None:
user = f'user_{identity.id}'
identity.provides.add(RoleNeed(database.session.query(database.Users).get(int(identity.id)).role))
except:
logout_user()
identity_changed.send(current_app._get_current_object(), identity=AnonymousIdentity())
"""
"""-------------------------------------------------------------------------------------------
-- USERS AND AUTHENTICATION
-------------------------------------------------------------------------------------------"""
@app.login_manager.user_loader
def load_user(user_id):
return database.session.query(database.Users).filter(database.Users.id == int(user_id)).first()
def verify_password(username, password):
if check_password_hash(database.session.query(database.Users).filter(
(database.Users.username == username) | (database.Users.email == username)).first().password, password):
return True
return False
"""-------------------------------------------------------------------------------------------
-- ROUTES
-------------------------------------------------------------------------------------------"""
""" --- PUBLIC PAGES --- """
"""
@app.route('/samplepost', methods=['POST'])
def test_post():
print('new post')
data = request.json['data']
print(type(data))
for item in data:
print('itemm')
return 'POST PAGE'
"""
# INDEX
@app.route('/', methods=['GET'])
def index_page():
if stashrconfig['APP']['first_run']:
flash('First Run')
return redirect(url_for('first_run_page'))
return render_template(
'index_page.html',
title='Home',
open_registration=stashrconfig['APP']['open_registration']
)
# LOGIN
@app.route('/login', methods=['GET', 'POST'])
def login_page():
if current_user.is_authenticated:
flash('User Logged In', 'Info')
return redirect(url_for('index_page'))
login_form = forms.login_form()
if login_form.validate_on_submit() and login_form.login_button.data:
if verify_password(login_form.username.data, login_form.password.data):
user = database.session \
.query(database.Users) \
.filter((database.Users.username == login_form.username.data)|
(database.Users.email == login_form.username.data)) \
.first()
login_user(user, remember=login_form.remember_me.data)
# identity_changed.send(current_app._get_current_object(), identity=Identity(user.id))
flash('Logged In', 'success')
return redirect(url_for('index_page'))
else:
flash('Incorrect Username or Password', 'error')
return redirect(url_for('login_page'))
return render_template(
'login_page.html',
login_form=login_form,
title='Login',
open_registration=stashrconfig['APP']['open_registration']
)
# REGISTER
@app.route('/register', methods=['GET', 'POST'])
def register_page():
if not stashrconfig['APP']['open_registration']:
flash('Registration Closed', 'error')
return redirect(url_for('login_page'))
registration_form = forms.registration_form()
if registration_form.validate_on_submit() and registration_form.register_button.data:
utils.register_new_user(registration_form)
return redirect(url_for('login_page'))
return render_template(
'register_page.html',
registration_form=registration_form,
title='Register',
open_registration=stashrconfig['APP']['open_registration']
)
# FORGOT PASSWORD
@app.route('/forgot', methods=['GET', 'POST'])
def forgot_page():
if current_user.is_authenticated:
flash('User Logged In', 'error')
return redirect(url_for('index_page'))
forgot_password_form = forms.forgot_password_form()
if forgot_password_form.validate_on_submit() and forgot_password_form.forgot_button.data:
# utils.send_password_change_request(forgot_password_form)
flash('Email Reminder Sent', 'info')
return redirect(url_for('index_page'))
return render_template(
'forgot_password_page.html',
forgot_password_form=forgot_password_form,
title='Forgot Password',
open_registration=stashrconfig['APP']['open_registration']
)
""" --- LOGOUT --- """
# LOGOUT
@app.route('/logout')
@login_required
def logout_page():
logout_user()
# identity_changed.send(current_app._get_current_object(), identity=AnonymousIdentity())
flash('Logged Out', 'info')
return redirect(url_for('index_page'))
""" --- SETTINGS --- """
# SETTINGS
@app.route('/settings')
@login_required
def settings_page(section=None, user_id=None, user_function=None):
# if not permission_admin.require().can():
if current_user.role != 'admin':
flash('Permission Denied', 'error')
return redirect(url_for('index_page'))
return redirect(url_for('settings_app_page'))
# SETTINGS - APP
@app.route('/settings/app', methods=['GET', 'POST'])
def settings_app_page():
# if not permission_admin.require().can():
if current_user.role != 'admin':
flash('Permission Denied', 'error')
return redirect(url_for('index_page'))
settings_form = forms.update_app_settings_form()
if request.method == 'POST':
if settings_form.validate():
utils.update_app_settings(settings_form)
flash('App Settings Updated', 'success')
else:
print(settings_form.errors.items())
for error in settings_form.errors.items():
flash(f'{error[0]}: {error[1]}', 'error')
return render_template(
'settings_page_app.html',
title='App Settings',
settings_form=settings_form
)
# SETTINGS - DIRECTORIES
@app.route('/settings/directories', methods=['GET', 'POST'])
def settings_directories_page():
directories_form = forms.update_directory_settings_form()
if request.method == 'POST':
if directories_form.validate():
utils.update_directory_settings(directories_form)
flash('Directory Settings Updated', 'success')
else:
print(directories_form.errors.items())
for error in directories_form.errors.items():
flash(f'{error[0]}: {error[1]}', 'error')
return render_template(
'settings_page_directories.html',
title='Directory Settings',
directories_form=directories_form
)
# SETTINGS - MAIL
@app.route('/settings/mail', methods=['GET', 'POST'])
def settings_mail_page():
# if not permission_admin.require().can():
if current_user.role != 'admin':
flash('Permission Denied', 'error')
return redirect(url_for('index_page'))
mail_form = forms.update_mail_settings_form()
if request.method == 'POST':
if mail_form.validate():
utils.update_mail_settings(mail_form)
flash('Mail Settings Updated', 'success')
else:
for error in mail_form.errors.items():
flash(f'{error[0]}: {error[1]}', 'error')
return render_template(
'settings_page_mail.html',
title='Mail Settings',
mail_form=mail_form
)
# SETTINGS - TASKS
@app.route('/settings/tasks', methods=['GET', 'POST'])
def settings_tasks_page():
# if not permission_admin.require().can():
if current_user.role != 'admin':
flash('Permission Denied', 'error')
return redirect(url_for('index_page'))
return render_template(
'settings_page_tasks.html',
title='Task Settings',
)
# SETTINGS - ALL USERS
@app.route('/settings/users', methods=['GET', 'POST'])
def settings_all_users_page():
# if not permission_admin.require().can():
if current_user.role != 'admin':
flash('Permission Denied', 'error')
return redirect(url_for('index_page'))
users = database.session.query(database.Users).all()
return render_template(
'settings_page_all_users.html',
title='Users',
ratings_dict=database.ratings_dict,
users=users,
)
# SETTINGS - SINGLE USER
@app.route('/settings/users/<user_id>', methods=['GET', 'POST'])
def settings_single_user_page(user_id):
"""
if not permission_admin.require().can():
flash('Permission Denied', 'error')
return redirect(url_for('index_page'))
"""
"""
if current_user.id != user_id:
return redirect(url_for('settings_all_users_page'))
"""
user = database.session.query(database.Users).get(int(user_id))
change_password_form = forms.change_password_form(user)
reset_password_form = forms.reset_password_form(user)
delete_user_form = forms.delete_user_form(user)
edit_user_form = forms.edit_user_form(user)
if request.method == 'POST':
if change_password_form.update_password_button.data:
if change_password_form.validate():
utils.change_user_password(change_password_form)
flash('Password Updated','success')
else:
for error in change_password_form.errors.items():
flash(f'{error[0]}: {error[1]}', 'error')
if reset_password_form.reset_password_button.data:
if reset_password_form.validate() and current_user.role == 'admin':
utils.reset_user_password(reset_password_form)
flash('Password Reset', 'success')
else:
for error in reset_password_form.errors.items():
flash(f'{error[0]}: {error[1]}', 'error')
if delete_user_form.delete_user_button.data:
if delete_user_form.validate() and current_user.role == 'admin':
utils.delete_user_account(delete_user_form)
flash('User Deleted', 'success')
return redirect(url_for('settings_all_users_page'))
else:
flash(current_user.role,'error')
if edit_user_form.edit_user_button.data:
if edit_user_form.validate():
utils.edit_user_account(edit_user_form)
flash('User Account Edited', 'success')
else:
for error in edit_user_form.errors.items():
flash(f'{error[0]}: {error[1]}', 'error')
return render_template(
'settings_page_single_user.html',
user=user,
change_password_form=change_password_form,
reset_password_form = reset_password_form,
delete_user_form = delete_user_form,
edit_user_form = edit_user_form,
title='User Information',
user_id=user_id
)
# SETTINGS - NEW USER
@app.route('/settings/newuser', methods=['GET', 'POST'])
def settings_new_user_page():
# if not permission_admin.require().can():
if current_user.role != 'admin':
flash('Permission Denied', 'error')
return redirect(url_for('index_page'))
new_user_form = forms.new_user_form()
if request.method == 'POST':
if new_user_form.validate():
utils.create_new_user(new_user_form)
flash('User Created', 'success')
return redirect(url_for('settings_all_users_page'))
else:
for error in new_user_form.errors.items():
flash(f'{error[0]}: {error[1]}', 'error')
return render_template(
'settings_page_new_user.html',
title='New User',
new_user_form=new_user_form
)
# SETTINGS - STATS
@app.route('/settings/stats', methods=['GET', 'POST'])
def settings_stats_page():
# if not permission_admin.require().can():
if current_user.role != 'admin':
flash('Permission Denied', 'error')
return redirect(url_for('index_page'))
return 'STATS SETTINGS PAGE'
# SETTINGS - LOG
@app.route('/settings/log', methods=['GET', 'POST'])
def settings_log_page():
# if not permission_admin.require().can():
if current_user.role != 'admin':
flash('Permission Denied', 'error')
return redirect(url_for('index_page'))
return 'LOG SETTINGS PAGE'
# SETTINGS - PLUGINS
@app.route('/settings/plugins', methods=['GET', 'POST'])
def settings_plugins_page():
# if not permission_admin.require().can():
if current_user.role != 'admin':
flash('Permission Denied', 'error')
return redirect(url_for('index_page'))
plugins = pm.get_all_plugins
return render_template(
'settings_page_plugins.html',
title='Plugins',
plugins=plugins
)
""" --- FIRST RUN --- """
# FIRST RUN
@app.route('/firstrun', methods=['GET', 'POST'])
def first_run_page():
if current_user.is_authenticated:
flash('App Configured', 'info')
return redirect(url_for('index_page'))
if not stashrconfig['APP']['first_run']:
flash('App Configured', 'info')
return redirect(url_for('index_page'))
first_run_form = forms.app_first_run_form()
if request.method == 'POST' and first_run_form.first_run_button.data:
utils.complete_first_run(first_run_form)
flash('Setup Complete', 'success')
return redirect(url_for('index_page'))
return render_template(
'first_run_page.html',
title='Configuration',
first_run_form=first_run_form,
)
""" --- COMIC PAGES --- """
# ALL VOLUMES
@app.route('/volumes', methods=['GET'])
@login_required
def all_volumes_page():
return render_template(
'all_volumes_page.html',
title='All Volumes',
)
# SINGLE VOLUME
@app.route('/volumes/<volume_id>', methods=['GET'])
@login_required
def single_volume_page(volume_id):
volume = database.session \
.query(database.Volumes) \
.filter(or_(database.Volumes.volume_id == volume_id,
database.Volumes.volume_slug == volume_id)) \
.filter(database.Volumes.volume_age_rating <= current_user.rating_allowed) \
.first()
if volume is None:
flash('Volume Not Found', 'error')
return redirect(url_for('all_volumes_page'))
return render_template(
'single_volume_page.html',
title=volume.volume_name,
volume_id=volume.volume_id,
volume_slug=volume.volume_slug
)
# ALL PUBLISHERS PAGE
@app.route('/publishers')
@login_required
def all_publishers_page():
return render_template(
'all_publishers_page.html',
title='All Publishers',
)
# SINGLE PUBLISHERS PAGE
@app.route('/publishers/<publisher_id>')
@login_required
def single_publisher_page(publisher_id):
publisher = database.session \
.query(database.Publishers) \
.filter(database.Publishers.publisher_id == publisher_id) \
.first()
if publisher is None:
flash('Publisher Not Found', 'error')
return redirect(url_for('all_publishers_page'))
return render_template(
'single_publisher_page.html',
title=publisher.publisher_name,
publisher_id=publisher_id,
)
""" --- READING COMIC PAGES --- """
# READ ISSUE PAGE
@app.route('/read/<issue_id>')
@login_required
def read_issue_page(issue_id):
issue = database.session \
.query(database.Issues) \
.filter(database.Issues.issue_id == issue_id) \
.first()
print(issue.__dict__)
if issue.volume.volume_age_rating > current_user.rating_allowed:
flash('Access Denied', 'error')
return redirect(url_for('all_volumes_page'))
return render_template(
'read_issue_page.html',
title='Read Issue',
issue_id=issue_id,
comic_name=f'{issue.volume.volume_name} - #{issue.issue_number}'
)
""" --- NEW RELEASES --- """
# NEW RELEASES
@app.route('/releases')
@login_required
def new_releases_page():
return render_template(
'new_releases_page.html',
title='New Releases',
)
""" --- READING LIST --- """
# READING LIST
@app.route('/readinglist')
@login_required
def reading_list_page():
return render_template(
'reading_list_page.html',
title='Reading List',
)
""" --- FOLDERS --- """
@app.route('/scrape')
@login_required
def scrape_folders_page():
return render_template(
'scrape_page.html',
title="Scrape"
)
""" --- COLLECTIONS --- """
# ALL COLLECTIONS
@app.route('/collections')
@login_required
def all_collections_page():
return render_template(
'all_collections_page.html',
title='Collections',
)
# SINGLE COLLECTIONS
@app.route('/collections/<collection_slug>', methods=['GET'])
@login_required
def single_collection_page(collection_slug):
collection = database.session \
.query(database.Collections) \
.filter(database.Collections.collection_slug == collection_slug) \
.first()
if collection is None:
flash('Collection Not Found', 'error')
return redirect(url_for('all_collections_page'))
return render_template(
'single_collection_page.html',
title=collection.collection_name,
collection_slug = collection_slug,
collection=collection,
)
""" --- SEARCH --- """
# SEARCH PAGE
@app.route('/search', methods=['GET'])
@login_required
def search_page():
return render_template(
'search_page.html',
title='Search',
)
""" --- CUSTOM ROUTING --- """
# CUSTOM STATIC COVER
@app.route('/images/<foldername>/<path:filename>')
@login_required
def custom_image_static(foldername, filename):
path = stashrconfig['DIRECTORY']['images']
return send_from_directory(
os.path.join(
# folders.base_path,
folders.StashrPaths().base_path(),
path,
foldername
),
filename
)
@app.route('/service-worker.js')
def custom_service_worker():
return app.send_static_file('js/service-worker.js')
"""-------------------------------------------------------------------------------------------
-- DEVELOPMENT
-------------------------------------------------------------------------------------------"""
if __name__=='__main__':
logger.warning('Starting Development Server')
app.run(host='0.0.0.0', port=5002, debug=True)