cfone/app/base/models.py
2021-09-22 23:46:35 -04:00

263 lines
9.1 KiB
Python

# -*- encoding: utf-8 -*-
"""
Copyright (c) 2019 - present AppSeed.us
"""
from flask_login import UserMixin
from sqlalchemy import LargeBinary, Column, Integer, String
from datetime import datetime,timedelta
from time import time
from hashlib import md5
import json
# from flask import current_app, request, url_for
from app import db, login_manager
from app.base.util import hash_pass
ws_node = db.Table(
"ws_node",
# db.metadata,
db.Column(
"group_id", db.Integer, db.ForeignKey("groups.group_id"), primary_key=True
),
db.Column("id", db.Integer, db.ForeignKey("nodes.id"), primary_key=True),
)
ws_user = db.Table(
"ws_user",
# db.metadata,
db.Column(
"group_id", db.Integer, db.ForeignKey("groups.group_id"), primary_key=True
),
db.Column("user_id", db.Integer, db.ForeignKey("users.id"), primary_key=True),
)
edges = db.Table(
"edges",
# db.metadata,
db.Column("src_id", db.Integer, db.ForeignKey("nodes.id"), primary_key=True),
db.Column("dest", db.Integer, db.ForeignKey("nodes.id"), primary_key=True),
)
# roles_users = db.Table(
# "roles_users",
# db.Column("user_id", db.Integer(), db.ForeignKey("users.id")),
# db.Column("role_id", db.Integer(), db.ForeignKey("role.id")),
# )
followers = db.Table(
"followers",
db.Column("follower_id", db.Integer, db.ForeignKey('users.id')),
db.Column("followed_id", db.Integer, db.ForeignKey('users.id'))
)
class User(db.Model, UserMixin):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(255), unique=True)
username = db.Column(db.String(50), nullable=True)
password = db.Column(db.TEXT)
has_active_sub = db.Column(db.Integer)
active = db.Column(db.Boolean())
created_date = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
last_seen = db.Column(db.DateTime(), nullable=False, default=datetime.utcnow)
# roles = db.relationship(
# "Role", secondary=roles_users, backref=db.backref("users", lazy="dynamic")
# )
avatar_hash = db.Column(db.String(32))
member_of = db.relationship( "Group",
secondary=ws_user,
backref="members",
lazy="select"
)
user_nodes = db.relationship( "Node",
backref="node_user")
def __init__(self, username,email, password):
self.username= username
self.email = email
self.password = password
# self.roles = roles
def __repr__(self):
return "{}({!r})".format(self.__class__.__name__, self.__dict__)
@login_manager.user_loader
def user_loader(id):
return User.query.filter_by(id=id).first()
@login_manager.request_loader
def request_loader(request):
username = request.form.get('username')
user = User.query.filter_by(username=username).first()
return user if user else None
# extra
followed = db.relationship(
'User', secondary=followers,
primaryjoin=(followers.c.follower_id == id),
secondaryjoin=(followers.c.followed_id == id),
backref=db.backref('followers', lazy='dynamic'), lazy='dynamic')
messages_sent = db.relationship('Message',
foreign_keys='Message.sender_id',
backref='author', lazy='dynamic')
messages_received = db.relationship('Message',
foreign_keys='Message.recipient_id',
backref='recipient', lazy='dynamic')
last_message_read_time = db.Column(db.DateTime)
notifications = db.relationship('Notification', backref='user',
lazy='dynamic')
tasks = db.relationship('Task', backref='user', lazy='dynamic')
def avatar(self, size):
digest = md5(self.email.lower().encode('utf-8')).hexdigest()
return 'https://www.gravatar.com/avatar/{}?d=identicon&s={}'.format(
digest, size)
def follow(self, user):
if not self.is_following(user):
self.followed.append(user)
def unfollow(self, user):
if self.is_following(user):
self.followed.remove(user)
def is_following(self, user):
return self.followed.filter(
followers.c.followed_id == users.id).count() > 0
def followed_posts(self):
followed = Node.query.join(
followers, (followers.c.followed_id == Node.user_id)).filter(
followers.c.follower_id == self.id)
own = Node.query.filter_by(user_id=self.id)
return followed.union(own).order_by(Node.timestamp.desc())
class Node(db.Model):
__tablename__ = "nodes"
id = db.Column(db.Integer, primary_key=True)
created_date = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
post_name = db.Column(db.String)
post_desc = db.Column(db.String)
post_budget = db.Column(db.Integer)
owner_id = db.Column(db.Integer, db.ForeignKey("users.id"))
owner_name = db.relationship("User", foreign_keys=[owner_id])
n_comments = db.relationship("Comment", backref="p_node")
n_bids = db.relationship("Bid", backref="p_node")
parents = db.relationship(
"Node",
secondary=edges,
primaryjoin="Node.id==edges.c.src_id",
secondaryjoin="Node.id==edges.c.dest",
backref="children",
uselist=False,
)
accessed_by = db.relationship(
"Group", secondary=ws_node, back_populates="access_to"
)
def __repr__(self):
return "{}({!r})".format(self.__class__.__name__, self.__dict__)
def get_nodes_by_user(user_id,ws_id):
ws = (db.session.query(Node)
.join(Node.accessed_by)
.join(Group.members)
.filter(User.id == user_id)
.filter(Group.group_id == ws_id)
.all())
return ws
def get_comments(post_id):
posts_tag = db.session.query(Node).filter_by(id=post_id).first()
comment = db.session.query(Bid).filter(Bid.post_id==post_id).all()
comment_len = len(comment)
return posts_tag,comment,comment_len
class Bid(db.Model):
__tablename__ = "bids"
bid_id = db.Column(db.Integer, primary_key=True, autoincrement=True)
owner_id = db.Column(db.Integer, db.ForeignKey("users.id"))
owner_name = db.relationship("User", foreign_keys=[owner_id])
bid_note = db.Column(db.String(255))
bid_amount = db.Column(db.Integer)
created_date = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
post_id = db.Column(db.Integer, db.ForeignKey("nodes.id"))
def __repr__(self):
return "{}({!r})".format(self.__class__.__name__, self.__dict__)
class Group(db.Model):
__tablename__ = "groups"
group_id = db.Column(db.Integer, primary_key=True)
# owner_id = db.Column(db.Integer)
owner_id = db.Column(db.Integer, db.ForeignKey("users.id"))
owner_name = db.relationship("User", foreign_keys=[owner_id])
created_date = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
ws_name = db.Column(db.String)
access_to = db.relationship("Node", secondary=ws_node, back_populates="accessed_by")
def __repr__(self):
return "{}({})".format(self.__class__.__name__, self.__dict__)
def get_ws_by_user(user_id):
# ws = (db.session.query(Group).join(Node.accessed_by).join(Group.members).filter(Users.id == user_id).all())
ws = (db.session.query(Group).join(Group.members).filter(User.id == user_id).all())
return ws
class Comment(db.Model):
__tablename__ = 'comments'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
owner_id = db.Column(db.Integer, db.ForeignKey('users.id'))
owner_name = db.relationship("User", foreign_keys=[owner_id])
post_id = db.Column(db.Integer, db.ForeignKey('nodes.id'))
message = db.Column(db.String(255))
created_date = db.Column(db.DateTime, nullable=False,default=datetime.utcnow)
#extra stuff
class Message(db.Model):
id = db.Column(db.Integer, primary_key=True)
sender_id = db.Column(db.Integer, db.ForeignKey('users.id'))
recipient_id = db.Column(db.Integer, db.ForeignKey('users.id'))
body = db.Column(db.String(140))
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
def __repr__(self):
return '<Message {}>'.format(self.body)
class Notification(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(128), index=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
timestamp = db.Column(db.Float, index=True, default=time)
payload_json = db.Column(db.Text)
def get_data(self):
return json.loads(str(self.payload_json))
class Task(db.Model):
id = db.Column(db.String(36), primary_key=True)
name = db.Column(db.String(128), index=True)
description = db.Column(db.String(128))
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
complete = db.Column(db.Boolean, default=False)
def get_rq_job(self):
try:
rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis)
except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError):
return None
return rq_job
def get_progress(self):
job = self.get_rq_job()
return job.meta.get('progress', 0) if job is not None else 100