API module
backend.api
special
capacity
delete_capacities(capacity_id=None, session=Depends(get_session))
async
Delete a capacity
Parameters
capacity_id : str ID of the capacity that is to be removed from the database. session : Session SQL session that is to be used to delete the capacity. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\capacity.py
@router.delete("/")
async def delete_capacities(
capacity_id: str = None,
session: Session = Depends(get_session),
):
"""
Delete a capacity
Parameters
----------
capacity_id : str
ID of the capacity that is to be removed from the database.
session : Session
SQL session that is to be used to delete the capacity.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Capacity).where(
Capacity.id == capacity_id,
)
capacity_to_delete = session.exec(statement).one()
session.delete(capacity_to_delete)
session.commit()
return True
get_capacities(session=Depends(get_session), is_locked=None, user_id=None, team_id=None, month=None, year=None)
async
Get list of all capacities.
Parameters
session : Session SQL session that is to be used to get a list of the epic areas. Defaults to creating a dependency on the running SQL model session. is_locked : bool Whether or not the capacity is locked or not. user_id : int User id of the user in question. team_id : int Team id of the user's team. month : int Month of capacity in question. year : int Year of capacity in question.
Source code in backend\api\capacity.py
@router.get("/")
async def get_capacities(
session: Session = Depends(get_session),
is_locked: bool = None,
user_id: int = None,
team_id: int = None,
month: int = None,
year: int = None,
):
"""
Get list of all capacities.
Parameters
----------
session : Session
SQL session that is to be used to get a list of the epic areas.
Defaults to creating a dependency on the running SQL model session.
is_locked : bool
Whether or not the capacity is locked or not.
user_id : int
User id of the user in question.
team_id : int
Team id of the user's team.
month : int
Month of capacity in question.
year : int
Year of capacity in question.
"""
statement = select(Capacity)
# Select capacity by user_id, team_id, month, year
if (user_id and team_id and month and year) != None:
statement = (
select(
Capacity.id.label("capacity_id"),
User.short_name.label("user_short_name"),
Team.short_name.label("team_short_name"),
Capacity.year,
Capacity.month,
Capacity.days,
)
.select_from(Capacity)
.join(User, Capacity.user_id == User.id)
.join(Team, Capacity.team_id == Team.id)
.where(Capacity.user_id == user_id)
.where(Capacity.team_id == team_id)
.where(Capacity.month == month)
.where(Capacity.year == year)
)
result = session.exec(statement).all()
return result
post_capacity(*, capacity, session=Depends(get_session))
async
Post new capacity.
Parameters
capacity : Capacity Capacity that is to be added to the database. session : Session SQL session that is to be used to add the capacity. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\capacity.py
@router.post("/")
async def post_capacity(*, capacity: Capacity, session: Session = Depends(get_session)):
"""
Post new capacity.
Parameters
----------
capacity : Capacity
Capacity that is to be added to the database.
session : Session
SQL session that is to be used to add the capacity.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Capacity).where(
and_(
Capacity.user_id == capacity.user_id,
Capacity.team_id == capacity.team_id,
capacity.year == capacity.year,
Capacity.month == capacity.month,
)
)
try:
result = session.exec(statement).one()
return False
except NoResultFound:
session.add(capacity)
session.commit()
session.refresh(capacity)
return capacity
client
activate_clients(*, client_id, session=Depends(get_session))
async
Activate a client using its id as a key.
Parameters
client_id : int ID of the client to be activated. session : Session SQL session that is to be used to activate a client. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\client.py
@router.put("/{client_id}/activate")
async def activate_clients(
*,
client_id: int,
session: Session = Depends(get_session),
):
"""
Activate a client using its id as a key.
Parameters
----------
client_id : int
ID of the client to be activated.
session : Session
SQL session that is to be used to activate a client.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Client).where(Client.id == client_id)
client_to_update = session.exec(statement).one()
client_to_update.is_active = True
client_to_update.updated_at = datetime.now()
session.add(client_to_update)
session.commit()
session.refresh(client_to_update)
return client_to_update
deactivate_clients(*, client_id, session=Depends(get_session))
async
Deactivate a client using its id as a key.
Parameters
client_id : int ID of the client to be deactivated. session : Session SQL session that is to be used to deactivate a client. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\client.py
@router.put("/{client_id}/deactivate")
async def deactivate_clients(
*,
client_id: int,
session: Session = Depends(get_session),
):
"""
Deactivate a client using its id as a key.
Parameters
----------
client_id : int
ID of the client to be deactivated.
session : Session
SQL session that is to be used to deactivate a client.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Client).where(Client.id == client_id)
client_to_update = session.exec(statement).one()
client_to_update.is_active = False
client_to_update.updated_at = datetime.now()
session.add(client_to_update)
session.commit()
session.refresh(client_to_update)
return client_to_update
post_client(*, client, session=Depends(get_session))
async
Post a new client.
Parameters
client : Client Client that is to be added to the database. session : Session SQL session that is to be used to add the client. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\client.py
@router.post("/")
async def post_client(*, client: Client, session: Session = Depends(get_session)):
"""
Post a new client.
Parameters
----------
client : Client
Client that is to be added to the database.
session : Session
SQL session that is to be used to add the client.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Client).where(Client.name == client.name)
try:
result = session.exec(statement).one()
return False
except NoResultFound:
session.add(client)
session.commit()
session.refresh(client)
return client
read_clients(*, client_id=None, session=Depends(get_session))
async
Get a client by client_id.
Parameters
client_id : int ID of client that is to be read. session : Session SQL session that is to be used to read a client. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\client.py
@router.get("/{client_id}")
async def read_clients(
*, client_id: int = None, session: Session = Depends(get_session)
):
"""
Get a client by client_id.
Parameters
----------
client_id : int
ID of client that is to be read.
session : Session
SQL session that is to be used to read a client.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Client).where(Client.id == client_id)
try:
result = session.exec(statement).one()
return result
except NoResultFound:
msg = f"""There is no client with id = {client_id}"""
return msg
read_clients_by_name(*, name=None, session=Depends(get_session))
async
Get a client by client_name.
Parameters
name : str Name of client to be read. session : Session SQL session that is to be used to read a client. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\client.py
@router.get("/names/{name}")
async def read_clients_by_name(
*, name: str = None, session: Session = Depends(get_session)
):
"""
Get a client by client_name.
Parameters
----------
name : str
Name of client to be read.
session : Session
SQL session that is to be used to read a client.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Client).where(Client.name == name)
result = session.exec(statement).one()
return result
read_clients_epics(client_id=None, session=Depends(get_session))
async
Get epics from a client_id.
Parameters
client_id : int ID of client that is to be used to pull epics from. session : Session SQL session that is to be used to pull the epics. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\client.py
@router.get("/{client_id}/epics/")
async def read_clients_epics(
client_id: int = None, session: Session = Depends(get_session)
):
"""
Get epics from a client_id.
Parameters
----------
client_id : int
ID of client that is to be used to pull epics from.
session : Session
SQL session that is to be used to pull the epics.
Defaults to creating a dependency on the running SQL model session.
"""
statement = (
select(Client.id, Client.name, Epic.name)
.select_from(Client)
.join(Epic)
.where(Client.id == client_id)
)
results = session.exec(statement).all()
return results
update_clients(*, client_id=None, new_client_name=None, session=Depends(get_session))
async
Update a client from a client_id.
Parameters
client_id : int ID of the client to update. new_client_name : str New name of the client. session : Session SQL session that is to be used to update a client. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\client.py
@router.put("/{client_id}/new-name")
async def update_clients(
*,
client_id: int = None,
new_client_name: str = None,
session: Session = Depends(get_session),
):
"""
Update a client from a client_id.
Parameters
----------
client_id : int
ID of the client to update.
new_client_name : str
New name of the client.
session : Session
SQL session that is to be used to update a client.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Client).where(Client.id == client_id)
client_to_update = session.exec(statement).one()
client_to_update.name = new_client_name
client_to_update.updated_at = datetime.now()
session.add(client_to_update)
session.commit()
session.refresh(client_to_update)
return client_to_update
update_clients_and_epics(*, client_id, session=Depends(get_session))
async
Deactivate a client and its epics
Source code in backend\api\client.py
@router.put("/{client_id}/deactivate-epics")
async def update_clients_and_epics(
*,
client_id: int,
session: Session = Depends(get_session),
):
"""Deactivate a client and its epics"""
"""
Deactivate a client and its epics using the client's ID as a key.
Parameters
----------
client_id : int
ID of the client to deactivate.
session : Session
SQL session that is to be used to deactivate the client and its respective epics.
Defaults to creating a dependency on the running SQL model session.
"""
statement1 = select(Client).where(Client.id == client_id)
client_to_update = session.exec(statement1).one()
client_to_update.is_active = False
client_to_update.updated_at = datetime.now()
session.add(client_to_update)
statement2 = select(Epic).where(Epic.client_id == client_id)
epics_to_update = session.exec(statement2).all()
for epic in epics_to_update:
epic.is_active = False
session.add(epic)
session.commit()
return True
demand
delete_demands(demand_id=None, session=Depends(get_session))
async
Delete a demand.
Parameters
demand_id : str ID of the demand that is to be removed from the database. session : Session SQL session that is to be used to delete the demand. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\demand.py
@router.delete("/")
async def delete_demands(
demand_id: str = None,
session: Session = Depends(get_session),
):
"""
Delete a demand.
Parameters
----------
demand_id : str
ID of the demand that is to be removed from the database.
session : Session
SQL session that is to be used to delete the demand.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Demand).where(
Demand.id == demand_id,
)
demand_to_delete = session.exec(statement).one()
session.delete(demand_to_delete)
session.commit()
return True
get_demands(session=Depends(get_session), is_locked=None, team_id=None, epic_id=None, month=None, year=None)
async
Get list of all demands.
Parameters
session : Session SQL session that is to be used to get a list of all of the demands. Defaults to creating a dependency on the running SQL model session. is_locked : bool Whether or not the demand is locked or not. team_id : int ID of the team to get the demand from. epic_id : int ID of the epic to get the demand from. month : int Month of the demand. year : int Year of the demand.
Source code in backend\api\demand.py
@router.get("/")
async def get_demands(
session: Session = Depends(get_session),
is_locked: bool = None,
team_id: int = None,
epic_id: int = None,
month: int = None,
year: int = None,
):
"""
Get list of all demands.
Parameters
----------
session : Session
SQL session that is to be used to get a list of all of the demands.
Defaults to creating a dependency on the running SQL model session.
is_locked : bool
Whether or not the demand is locked or not.
team_id : int
ID of the team to get the demand from.
epic_id : int
ID of the epic to get the demand from.
month : int
Month of the demand.
year : int
Year of the demand.
"""
statement = select(Demand)
# Select demand by epic_id, team_id, month, year
if (team_id and epic_id and month and year) != None:
statement = (
select(
Demand.id.label("demand_id"),
Team.short_name.label("team_short_name"),
Epic.short_name.label("epic_short_name"),
Demand.year,
Demand.month,
Demand.days,
)
.select_from(Demand)
.join(Team, Demand.team_id == Team.id)
.join(Epic, Demand.epic_id == Epic.id)
.where(Demand.team_id == team_id)
.where(Demand.epic_id == epic_id)
.where(Demand.month == month)
.where(Demand.year == year)
)
result = session.exec(statement).all()
return result
post_demand(*, demand, session=Depends(get_session))
async
Post a demand.
Parameters
demand : Demand Demand that is to be added to the database. session : Session SQL session that is to be used to add the demand. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\demand.py
@router.post("/")
async def post_demand(*, demand: Demand, session: Session = Depends(get_session)):
"""
Post a demand.
Parameters
----------
demand : Demand
Demand that is to be added to the database.
session : Session
SQL session that is to be used to add the demand.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Demand).where(
and_(
Demand.team_id == demand.team_id,
Demand.epic_id == demand.epic_id,
Demand.year == demand.year,
Demand.month == demand.month,
)
)
try:
result = session.exec(statement).one()
return False
except NoResultFound:
session.add(demand)
session.commit()
session.refresh(demand)
return demand
epic
activate_epic(epic_id=None, session=Depends(get_session))
async
Activate an epic using its ID as a key.
Parameters
epic_id : str ID of epic to be activated. session : Session SQL session that is to be used to activate the epic. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\epic.py
@router.put("/{epic_id}/activate")
async def activate_epic(
epic_id: str = None,
session: Session = Depends(get_session),
):
"""
Activate an epic using its ID as a key.
Parameters
----------
epic_id : str
ID of epic to be activated.
session : Session
SQL session that is to be used to activate the epic.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Epic).where(Epic.id == epic_id)
epic_to_activate = session.exec(statement).one()
epic_to_activate.is_active = True
epic_to_activate.updated_at = datetime.now()
session.add(epic_to_activate)
session.commit()
session.refresh(epic_to_activate)
return epic_to_activate
deactivate_epic(epic_id=None, session=Depends(get_session))
async
Deactivate an epic using its ID as a key.
Parameters
epic_id : str ID of epic to be deactivated. session : Session SQL session that is to be used to deactivate the epic. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\epic.py
@router.put("/{epic_id}/deactivate")
async def deactivate_epic(
epic_id: str = None,
session: Session = Depends(get_session),
):
"""
Deactivate an epic using its ID as a key.
Parameters
----------
epic_id : str
ID of epic to be deactivated.
session : Session
SQL session that is to be used to deactivate the epic.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Epic).where(Epic.id == epic_id)
epic_to_deactivate = session.exec(statement).one()
epic_to_deactivate.is_active = False
epic_to_deactivate.updated_at = datetime.now()
session.add(epic_to_deactivate)
session.commit()
session.refresh(epic_to_deactivate)
return epic_to_deactivate
get_active_epics_list(session=Depends(get_session))
async
Get list of active epics.
Parameters
session : Session SQL session that is to be used to get a list of the active epics. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\epic.py
@router.get("/active")
async def get_active_epics_list(session: Session = Depends(get_session)):
"""
Get list of active epics.
Parameters
----------
session : Session
SQL session that is to be used to get a list of the active epics.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Epic).where(Epic.is_active == True)
results = session.exec(statement).all()
return results
get_client_name_by_epic_id(epic_id, session=Depends(get_session))
async
Get client name from epic_id.
Parameters
epic_id : int ID of epic to pull client name from. session : Session SQL session that is to be used to pull the client name. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\epic.py
@router.get("/{epic_id}/client-name")
async def get_client_name_by_epic_id(
epic_id: int, session: Session = Depends(get_session)
):
"""
Get client name from epic_id.
Parameters
----------
epic_id : int
ID of epic to pull client name from.
session : Session
SQL session that is to be used to pull the client name.
Defaults to creating a dependency on the running SQL model session.
"""
statement = (
select(Client.name.label("client_name"), Client.id.label("client_id"))
.select_from(Epic)
.join(Sponsor, isouter=True)
.join(Client, isouter=True)
.where(Epic.id == epic_id)
.where(Client.is_active == True)
)
result = session.exec(statement).one()
return result
get_epic_by_team_sponsor(team_id, sponsor_id)
async
Get list of epics by team id and sponsor id.
Parameters
team_id : int ID of team to pull epics from. sponsor_id : int ID of sponsor to pull epics from. session : Session SQL session that is to be used to pull the epics. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\epic.py
@router.get("/teams/{team_id}/sponsors/{sponsor_id}/")
async def get_epic_by_team_sponsor(team_id: int, sponsor_id: int):
"""
Get list of epics by team id and sponsor id.
Parameters
----------
team_id : int
ID of team to pull epics from.
sponsor_id : int
ID of sponsor to pull epics from.
session : Session
SQL session that is to be used to pull the epics.
Defaults to creating a dependency on the running SQL model session.
"""
statement = (
select(
Epic.id.label("epic_id"),
Epic.name.label("epic_name"),
Epic.start_date,
Team.name.label("team_name"),
Sponsor.short_name.label("sponsor_short_name"),
)
.select_from(Epic)
.join(Team)
.join(Sponsor)
.where(Epic.team_id == team_id)
.where(Epic.sponsor_id == sponsor_id)
.where(Epic.is_active == True)
)
results = session.exec(statement).all()
return results
get_epic_list(session=Depends(get_session))
async
Get list of epics.
Parameters
session : Session SQL session that is to be used to get a list of the epics. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\epic.py
@router.get("/")
async def get_epic_list(session: Session = Depends(get_session)):
"""
Get list of epics.
Parameters
----------
session : Session
SQL session that is to be used to get a list of the epics.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Epic)
results = session.exec(statement).all()
return results
post_epic(*, epic, session=Depends(get_session))
async
Post new epic.
Parameters
epic : Epic Epic that is to be added to the database. session : Session SQL session that is to be used to add the epic. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\epic.py
@router.post("/")
async def post_epic(
*,
epic: Epic,
session: Session = Depends(get_session),
):
"""
Post new epic.
Parameters
----------
epic : Epic
Epic that is to be added to the database.
session : Session
SQL session that is to be used to add the epic.
Defaults to creating a dependency on the running SQL model session.
"""
statement1 = select(Epic).where(Epic.name == epic.name)
try:
result = session.exec(statement1).one()
return False
except NoResultFound:
session.add(epic)
session.commit()
session.refresh(epic)
return epic
update_epic(epic_id=None, new_short_name=None, new_name=None, session=Depends(get_session))
async
Update an epic.
Parameters
epic_id : str ID of epic to be updated. new_short_name : str Name of new short name. new_name : str Name of new name. session : Session SQL session that is to be used to update the epic. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\epic.py
@router.put("/")
async def update_epic(
epic_id: str = None,
new_short_name: str = None,
new_name: str = None,
session: Session = Depends(get_session),
):
"""
Update an epic.
Parameters
----------
epic_id : str
ID of epic to be updated.
new_short_name : str
Name of new short name.
new_name : str
Name of new name.
session : Session
SQL session that is to be used to update the epic.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Epic).where(Epic.id == epic_id).where(Epic.is_active == True)
epic_to_update = session.exec(statement).one()
if new_short_name != None:
epic_to_update.short_name = new_short_name
if new_name != None:
epic_to_update.name = new_name
session.add(epic_to_update)
epic_to_update.updated_at = datetime.now()
session.commit()
session.refresh(epic_to_update)
return epic_to_update
epic_area
activate_epic_area(epic_area_name=None, session=Depends(get_session))
async
Activate an epic area using its name as a key.
Parameters
epic_area_name : str Name of the epic area to be activated session : Session SQL session that is to be used to activate an epic area. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\epic_area.py
@router.put("/{epic_area_name}/activate")
async def activate_epic_area(
epic_area_name: str = None,
session: Session = Depends(get_session),
):
"""
Activate an epic area using its name as a key.
Parameters
----------
epic_area_name : str
Name of the epic area to be activated
session : Session
SQL session that is to be used to activate an epic area.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(EpicArea).where(EpicArea.name == epic_area_name)
epic_area_to_activate = session.exec(statement).one()
epic_area_to_activate.is_active = True
epic_area_to_activate.updated_at = datetime.now()
session.add(epic_area_to_activate)
session.commit()
session.refresh(epic_area_to_activate)
return epic_area_to_activate
deactivate_epic_area(epic_area_name=None, session=Depends(get_session))
async
Deactivate an epic area using its name as a key.
Parameters
epic_area_name : str Name of the epic area to be deactivated session : Session SQL session that is to be used to deactivate an epic area. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\epic_area.py
@router.put("/{epic_area_name}/deactivate")
async def deactivate_epic_area(
epic_area_name: str = None,
session: Session = Depends(get_session),
):
"""
Deactivate an epic area using its name as a key.
Parameters
----------
epic_area_name : str
Name of the epic area to be deactivated
session : Session
SQL session that is to be used to deactivate an epic area.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(EpicArea).where(EpicArea.name == epic_area_name)
epic_area_to_deactivate = session.exec(statement).one()
epic_area_to_deactivate.is_active = False
epic_area_to_deactivate.updated_at = datetime.now()
session.add(epic_area_to_deactivate)
session.commit()
session.refresh(epic_area_to_deactivate)
return epic_area_to_deactivate
get_active_epic_area_list(session=Depends(get_session))
async
Get list of active epic areas along with the name of their respective epics.
Parameters
session : Session SQL session that is to be used to get a list of the active epic areas. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\epic_area.py
@router.get("/active")
async def get_active_epic_area_list(session: Session = Depends(get_session)):
"""
Get list of active epic areas along with the name of their respective epics.
Parameters
----------
session : Session
SQL session that is to be used to get a list of the active epic areas.
Defaults to creating a dependency on the running SQL model session.
"""
statement = (
select(
EpicArea.id,
EpicArea.epic_id,
EpicArea.name.label("epic_area_name"),
Epic.id,
Epic.name.label("epic_name"),
)
.join(Epic)
.where(EpicArea.is_active == True)
)
results = session.exec(statement).all()
return results
get_epic_area_list(session=Depends(get_session))
async
Get list of epic areas.
Parameters
session : Session SQL session that is to be used to get a list of the epic areas. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\epic_area.py
@router.get("/")
async def get_epic_area_list(session: Session = Depends(get_session)):
"""
Get list of epic areas.
Parameters
----------
session : Session
SQL session that is to be used to get a list of the epic areas.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(EpicArea)
results = session.exec(statement).all()
return results
get_epic_name_by_epic_area_id(epic_area_id, session=Depends(get_session))
async
Return the epic name using the epic area id as a key.
Parameters
session : Session SQL session that is to be used to get the name of the epic. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\epic_area.py
@router.get("/{epic_area_id}/epic-name")
async def get_epic_name_by_epic_area_id(
epic_area_id: int, session: Session = Depends(get_session)
):
"""
Return the epic name using the epic area id as a key.
Parameters
----------
session : Session
SQL session that is to be used to get the name of the epic.
Defaults to creating a dependency on the running SQL model session.
"""
statement = (
select(EpicArea.id, Epic.id, Epic.name)
.join(Epic)
.where(EpicArea.id == epic_area_id)
.where(Epic.is_active == True)
)
result = session.exec(statement).one()
return result
post_epic_area(*, epic_area, session=Depends(get_session))
async
Post new epic area.
Parameters
epic_area : EpicArea Epic area that is to be added to the database. session : Session SQL session that is to be used to add the epic area. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\epic_area.py
@router.post("/")
async def post_epic_area(
*,
epic_area: EpicArea,
session: Session = Depends(get_session),
):
"""
Post new epic area.
Parameters
----------
epic_area : EpicArea
Epic area that is to be added to the database.
session : Session
SQL session that is to be used to add the epic area.
Defaults to creating a dependency on the running SQL model session.
"""
statement1 = select(EpicArea).where(
or_(EpicArea.name == epic_area.name, EpicArea.id == epic_area.id)
)
try:
result = session.exec(statement1).one()
return False
except NoResultFound:
session.add(epic_area)
session.commit()
session.refresh(epic_area)
return epic_area
read_epic_areas(epic_area_name=None, session=Depends(get_session))
async
Return a single epic area using a given epic area name as a key.
Parameters
epic_area_name : str Name of the epic area to be returned. session : Session SQL session that is to be used to read the single epic area. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\epic_area.py
@router.get("/{epic_name}")
async def read_epic_areas(
epic_area_name: str = None, session: Session = Depends(get_session)
):
"""
Return a single epic area using a given epic area name as a key.
Parameters
----------
epic_area_name : str
Name of the epic area to be returned.
session : Session
SQL session that is to be used to read the single epic area.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(EpicArea).where(EpicArea.name == epic_area_name)
try:
result = session.exec(statement).one()
return result
except NoResultFound:
msg = f"""There is no epic area named {epic_area_name}"""
return msg
update_epic_area(id=None, epic_id=None, name=None, is_active=None, session=Depends(get_session))
async
Update an epic area with new values.
Parameters
id : str ID of epic area to be updated. epic_id : str Epic ID to be updated. epic_area_name : str Name of the epic area to be updated. is_active : bool Change the status of the epic area. session : Session SQL session that is to be used to deactivate an epic area. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\epic_area.py
@router.put("/")
async def update_epic_area(
id: str = None,
epic_id: str = None,
name: str = None,
is_active: bool = None,
session: Session = Depends(get_session),
):
"""
Update an epic area with new values.
Parameters
----------
id : str
ID of epic area to be updated.
epic_id : str
Epic ID to be updated.
epic_area_name : str
Name of the epic area to be updated.
is_active : bool
Change the status of the epic area.
session : Session
SQL session that is to be used to deactivate an epic area.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(EpicArea).where(or_(EpicArea.name == name, EpicArea.id == id))
epic_area_to_update = session.exec(statement).one()
epic_area_to_update.epic_id = epic_id
epic_area_to_update.name = name
epic_area_to_update.is_active = is_active
session.add(epic_area_to_update)
epic_area_to_update.updated_at = datetime.now()
session.commit()
session.refresh(epic_area_to_update)
return epic_area_to_update
forecast
delete_forecasts(forecast_id=None, session=Depends(get_session))
async
Delete a forecast
Parameters
forecast_id : str ID of forecast to delete. session : Session SQL session that is to be used to delete the forecast. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\forecast.py
@router.delete("/")
async def delete_forecasts(
forecast_id: str = None,
session: Session = Depends(get_session),
):
"""
Delete a forecast
Parameters
----------
forecast_id : str
ID of forecast to delete.
session : Session
SQL session that is to be used to delete the forecast.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Forecast).where(
Forecast.id == forecast_id,
)
forecast_to_delete = session.exec(statement).one()
session.delete(forecast_to_delete)
session.commit()
return True
get_forecasts(session=Depends(get_session))
async
Get list of forecasts.
Parameters
session : Session SQL session that is to be used to get a list of the forecasts. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\forecast.py
@router.get("/")
async def get_forecasts(session: Session = Depends(get_session)):
"""
Get list of forecasts.
Parameters
----------
session : Session
SQL session that is to be used to get a list of the forecasts.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Forecast)
result = session.exec(statement).all()
return result
get_forecasts_by_user_year_epic(user_id, epic_id, year, month, session=Depends(get_session))
async
Get forecast by user, epic, year, month
Parameters
user_id ID of user from which to pull forecasts from. epic_id ID of epic from which to pull forecasts from. year Year from which to pull forecasts from. month Month from which to pull forecasts from. session : Session SQL session that is to be used to get the forecasts. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\forecast.py
@router.get("/users/{user_id}/epics/{epic_id}/year/{year}/month/{month}")
async def get_forecasts_by_user_year_epic(
user_id, epic_id, year, month, session: Session = Depends(get_session)
):
"""
Get forecast by user, epic, year, month
Parameters
----------
user_id
ID of user from which to pull forecasts from.
epic_id
ID of epic from which to pull forecasts from.
year
Year from which to pull forecasts from.
month
Month from which to pull forecasts from.
session : Session
SQL session that is to be used to get the forecasts.
Defaults to creating a dependency on the running SQL model session.
"""
statement = (
select(Forecast.id, Forecast.month, Forecast.year, Forecast.days)
.where(Forecast.user_id == user_id)
.where(Forecast.epic_id == epic_id)
.where(Forecast.year == year)
.where(Forecast.month == month)
)
results = session.exec(statement).all()
return results
get_forecasts_users(user_id=None, session=Depends(get_session))
async
Get forecasts from a given user.
Parameters
user_id : str User ID of user from which forecasts are to be pulled from. session : Session SQL session that is to be used to get the forecasts. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\forecast.py
@router.get("/{user_id}")
async def get_forecasts_users(
user_id: str = None, session: Session = Depends(get_session)
):
"""
Get forecasts from a given user.
Parameters
----------
user_id : str
User ID of user from which forecasts are to be pulled from.
session : Session
SQL session that is to be used to get the forecasts.
Defaults to creating a dependency on the running SQL model session.
"""
if user_id != None:
statement = (
select(
Epic.name,
Forecast.user_id,
Forecast.month,
Forecast.year,
Forecast.days,
)
.join(Epic)
.where(Forecast.user_id == user_id)
)
results = session.exec(statement).all()
return results
else:
raise ValueError
post_forecast(*, forecast, session=Depends(get_session))
async
Post a new forecast.
Parameters
forecast : Forecast Forecast that is to be added to the database. session : Session SQL session that is to be used to add the forecast. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\forecast.py
@router.post("/")
async def post_forecast(*, forecast: Forecast, session: Session = Depends(get_session)):
"""
Post a new forecast.
Parameters
----------
forecast : Forecast
Forecast that is to be added to the database.
session : Session
SQL session that is to be used to add the forecast.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Forecast).where(
and_(
Forecast.epic_id == forecast.epic_id,
Forecast.user_id == forecast.user_id,
Forecast.year == forecast.year,
Forecast.month == forecast.month,
)
)
try:
result = session.exec(statement).one()
return False
except NoResultFound:
session.add(forecast)
session.commit()
session.refresh(forecast)
return forecast
update_forecasts(user_id=None, epic_id=None, month=None, year=None, days=None, session=Depends(get_session))
async
Update a forecast.
Parameters
user_id : str ID of user to update. epic_id : str ID of epic to update. month : int Month to update. year : int Year to update. days : float Days to update. session : Session SQL session that is to be used to update the forecast. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\forecast.py
@router.put("/new-days")
async def update_forecasts(
user_id: str = None,
epic_id: str = None,
month: int = None,
year: int = None,
days: float = None,
session: Session = Depends(get_session),
):
"""
Update a forecast.
Parameters
----------
user_id : str
ID of user to update.
epic_id : str
ID of epic to update.
month : int
Month to update.
year : int
Year to update.
days : float
Days to update.
session : Session
SQL session that is to be used to update the forecast.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Forecast).where(
and_(
Forecast.user_id == user_id,
Forecast.epic_id == epic_id,
Forecast.month == month,
Forecast.year == year,
)
)
forecast_to_update = session.exec(statement).one()
forecast_to_update.days = days
session.add(forecast_to_update)
session.commit()
session.refresh(forecast_to_update)
return forecast_to_update
rate
activate_rate(rate_id, session=Depends(get_session))
async
Activate a rate using its id as a key.
Parameters
rate_id : str ID of the rate to be activated. session : Session SQL session that is to be used to activate a rate. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\rate.py
@router.put("/{rate_id}/activate")
async def activate_rate(
rate_id: str,
session: Session = Depends(get_session),
):
"""
Activate a rate using its id as a key.
Parameters
----------
rate_id : str
ID of the rate to be activated.
session : Session
SQL session that is to be used to activate a rate.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Rate).where(Rate.id == rate_id)
rate_to_activate = session.exec(statement).one()
rate_to_activate.is_active = True
rate_to_activate.updated_at = datetime.now()
session.add(rate_to_activate)
session.commit()
session.refresh(rate_to_activate)
return rate_to_activate
deactivate_rate_id(rate_id=None, session=Depends(get_session))
async
Deactivate a rate using its id as a key.
Parameters
rate_id : str ID of the rate to be deactivated. session : Session SQL session that is to be used to deactivate a rate. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\rate.py
@router.put("/{rate_id}/deactivate")
async def deactivate_rate_id(
rate_id: str = None,
session: Session = Depends(get_session),
):
"""
Deactivate a rate using its id as a key.
Parameters
----------
rate_id : str
ID of the rate to be deactivated.
session : Session
SQL session that is to be used to deactivate a rate.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Rate).where(Rate.id == rate_id)
rate_id_to_deactivate = session.exec(statement).one()
rate_id_to_deactivate.is_active = False
rate_id_to_deactivate.updated_at = datetime.now()
session.add(rate_id_to_deactivate)
session.commit()
session.refresh(rate_id_to_deactivate)
return rate_id_to_deactivate
rate(rate, session=Depends(get_session))
async
Post new rate
Source code in backend\api\rate.py
@router.post("/")
async def rate(
rate: Rate,
session: Session = Depends(get_session),
):
"""Post new rate"""
"""
Post new rate.
Parameters
----------
rate : Rate
Rate that is to be added to the database.
session : Session
SQL session that is to be used to add the rate.
Defaults to creating a dependency on the running SQL model session.
"""
statement1 = (
select(Rate)
.where(Rate.user_id == rate.user_id)
.where(Rate.client_id == rate.client_id)
.where(Rate.valid_from >= rate.valid_from)
)
one_day_delta = timedelta(days=1)
close_date = rate.valid_from - one_day_delta
statement2 = (
select(Rate)
.where(Rate.user_id == rate.user_id)
.where(Rate.client_id == rate.client_id)
.where(Rate.valid_to == far_date)
)
try:
result = session.exec(statement1).one()
return False
except NoResultFound:
try:
rate_to_close = session.exec(statement2).one()
rate_to_close.valid_to = close_date
rate_to_close.updated_at = datetime.now()
session.add(rate_to_close)
session.add(rate)
session.commit()
return True
except NoResultFound:
session.add(rate)
session.commit()
return True
rates_by_user_client(user_id, client_id, session=Depends(get_session))
async
Get list of rates using given user ids and clients ids as keys.
Parameters
user_id : int User id that is used to get list of rates. client_id : int Client id that is used to get the list of rates. session : Session SQL session that is to be used to read a certain rates. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\rate.py
@router.get("/users/{user_id}/clients/{client_id}/")
async def rates_by_user_client(
user_id: int,
client_id: int,
session: Session = Depends(get_session),
):
"""
Get list of rates using given user ids and clients ids as keys.
Parameters
----------
user_id : int
User id that is used to get list of rates.
client_id : int
Client id that is used to get the list of rates.
session : Session
SQL session that is to be used to read a certain rates.
Defaults to creating a dependency on the running SQL model session.
"""
statement = (
select(Rate).where(Rate.user_id == user_id).where(Rate.client_id == client_id)
)
result = session.exec(statement).all()
return result
rates_by_user_client_date(user_id, client_id, date, session=Depends(get_session))
async
Get rates from a certain date using a user id and client id as keys.
Parameters
user_id : int User id of user who's rate is in question. client_id : int Client id of client that's contracting the user. date : str Date from which the rates are needed. session : Session SQL session that is to be used to get the rates. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\rate.py
@router.get("/users/{user_id}/clients/{client_id}/months/")
async def rates_by_user_client_date(
user_id: int,
client_id: int,
date: str,
session: Session = Depends(get_session),
):
"""
Get rates from a certain date using a user id and client id as keys.
Parameters
----------
user_id : int
User id of user who's rate is in question.
client_id : int
Client id of client that's contracting the user.
date : str
Date from which the rates are needed.
session : Session
SQL session that is to be used to get the rates.
Defaults to creating a dependency on the running SQL model session.
"""
month_start_date = date_str_to_date(date)
statement = (
select(Rate)
.where(Rate.user_id == user_id)
.where(Rate.client_id == client_id)
.where(Rate.valid_from <= month_start_date)
.where(Rate.valid_to > month_start_date)
)
result = session.exec(statement).all()
return result
read_rates(session=Depends(get_session))
async
Get all rates.
Parameters
session : Session SQL session that is to be used to get the rates. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\rate.py
@router.get("/")
async def read_rates(
session: Session = Depends(get_session),
):
"""
Get all rates.
Parameters
----------
session : Session
SQL session that is to be used to get the rates.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Rate)
result = session.exec(statement).all()
return result
update_rates(rate_id=None, new_amount=None, session=Depends(get_session))
async
Update a rate with new values.
Parameters
user_id : str ID of user to be updated. client_id : str ID of client to be updated. new_amount : str Amount to be updated. session : Session SQL session that is to be used to update the rate. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\rate.py
@router.put("/")
async def update_rates(
rate_id: int = None,
new_amount: str = None,
session: Session = Depends(get_session),
):
"""
Update a rate with new values.
Parameters
----------
user_id : str
ID of user to be updated.
client_id : str
ID of client to be updated.
new_amount : str
Amount to be updated.
session : Session
SQL session that is to be used to update the rate.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Rate).where(Rate.id == rate_id)
rate_to_update = session.exec(statement).one()
rate_to_update.amount = new_amount
session.add(rate_to_update)
session.commit()
session.refresh(rate_to_update)
return True
role
activate_role(role_id=None, session=Depends(get_session))
async
Activate a role using the role ID as a key.
Parameters
role_id : str ID of role to be activated. session : Session SQL session that is to be used to activate the role. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\role.py
@router.put("/{role_id}/activate")
async def activate_role(
role_id: str = None,
session: Session = Depends(get_session),
):
"""
Activate a role using the role ID as a key.
Parameters
----------
role_id : str
ID of role to be activated.
session : Session
SQL session that is to be used to activate the role.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Role).where(Role.id == role_id)
role_to_activate = session.exec(statement).one()
role_to_activate.is_active = True
role_to_activate.updated_at = datetime.now()
session.add(role_to_activate)
session.commit()
session.refresh(role_to_activate)
return role_to_activate
deactivate_role(role_id=None, session=Depends(get_session))
async
Deactivate a role using the role ID as a key.
Parameters
role_id : str ID of role to be deactivated. session : Session SQL session that is to be used to deactivate the role. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\role.py
@router.put("/{role_id}/deactivate")
async def deactivate_role(
role_id: str = None,
session: Session = Depends(get_session),
):
"""
Deactivate a role using the role ID as a key.
Parameters
----------
role_id : str
ID of role to be deactivated.
session : Session
SQL session that is to be used to deactivate the role.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Role).where(Role.id == role_id)
role_to_deactivate = session.exec(statement).one()
role_to_deactivate.is_active = False
role_to_deactivate.updated_at = datetime.now()
session.add(role_to_deactivate)
session.commit()
session.refresh(role_to_deactivate)
return role_to_deactivate
post_role(*, role, session=Depends(get_session))
async
Post a new role.
Parameters
role : Role Role that is to be added to the database. session : Session SQL session that is to be used to add the role. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\role.py
@router.post("/")
async def post_role(*, role: Role, session: Session = Depends(get_session)):
"""
Post a new role.
Parameters
----------
role : Role
Role that is to be added to the database.
session : Session
SQL session that is to be used to add the role.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Role).where(Role.id == role.id)
try:
result = session.exec(statement).one()
return False
except NoResultFound:
session.add(role)
session.commit()
session.refresh(role)
return role
read_roles(session=Depends(get_session))
async
Get list of active roles.
Parameters
session : Session SQL session that is to be used to get the roles. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\role.py
@router.get("/active")
async def read_roles(session: Session = Depends(get_session)):
"""
Get list of active roles.
Parameters
----------
session : Session
SQL session that is to be used to get the roles.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Role).where(Role.is_active == True)
results = session.exec(statement).all()
return results
update_role(id=None, new_name=None, new_short_name=None, is_active=None, session=Depends(get_session))
async
Update a role.
Parameters
id : str ID of role to be updated. new_name : str New name of the role. new_short_name : str New short name of the role. is_active : bool New status of the role. session : Session SQL session that is to be used to update the role. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\role.py
@router.put("/")
async def update_role(
id: str = None,
new_name: str = None,
new_short_name: str = None,
is_active: bool = None,
session: Session = Depends(get_session),
):
"""
Update a role.
Parameters
----------
id : str
ID of role to be updated.
new_name : str
New name of the role.
new_short_name : str
New short name of the role.
is_active : bool
New status of the role.
session : Session
SQL session that is to be used to update the role.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Role.is_active).where(Role.id == id)
result = session.exec(statement).first()
if result == True:
statement = select(Role).where(Role.id == id)
role_to_update = session.exec(statement).one()
if new_name != None:
role_to_update.name = new_name
if new_short_name != None:
role_to_update.short_name = new_short_name
if is_active != None:
role_to_update.is_active = is_active
session.add(role_to_update)
role_to_update.updated_at = datetime.now()
session.commit()
session.refresh(role_to_update)
return role_to_update
else:
return False
sponsor
activate_sponsor(sponsor_name=None, session=Depends(get_session))
async
Activate a sponsor.
Parameters
sponsor_name : str Name of sponsor to be activated. session : Session SQL session that is to be used to activate the sponsor. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\sponsor.py
@router.put("/{sponsor_name}/activate")
async def activate_sponsor(
sponsor_name: str = None,
session: Session = Depends(get_session),
):
"""
Activate a sponsor.
Parameters
----------
sponsor_name : str
Name of sponsor to be activated.
session : Session
SQL session that is to be used to activate the sponsor.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Sponsor).where(Sponsor.name == sponsor_name)
sponsor_to_activate = session.exec(statement).one()
sponsor_to_activate.is_active = True
sponsor_to_activate.updated_at = datetime.now()
session.add(sponsor_to_activate)
session.commit()
session.refresh(sponsor_to_activate)
return sponsor_to_activate
deactivate_sponsor(sponsor_name=None, session=Depends(get_session))
async
Deactivate a sponsor.
Parameters
sponsor_name : str Name of sponsor to be deactivated. session : Session SQL session that is to be used to deactivate the sponsor. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\sponsor.py
@router.put("/{sponsor_name}/deactivate")
async def deactivate_sponsor(
sponsor_name: str = None,
session: Session = Depends(get_session),
):
"""
Deactivate a sponsor.
Parameters
----------
sponsor_name : str
Name of sponsor to be deactivated.
session : Session
SQL session that is to be used to deactivate the sponsor.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Sponsor).where(Sponsor.name == sponsor_name)
sponsor_to_deactivate = session.exec(statement).one()
sponsor_to_deactivate.is_active = False
sponsor_to_deactivate.updated_at = datetime.now()
session.add(sponsor_to_deactivate)
session.commit()
session.refresh(sponsor_to_deactivate)
return sponsor_to_deactivate
get_active_sponsor_list(session=Depends(get_session))
async
Get list of active sponsors along with the name of the client.
Parameters
session : Session SQL session that is to be used to get a list of the sponsors. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\sponsor.py
@router.get("/active")
async def get_active_sponsor_list(session: Session = Depends(get_session)):
"""
Get list of active sponsors along with the name of the client.
Parameters
----------
session : Session
SQL session that is to be used to get a list of the sponsors.
Defaults to creating a dependency on the running SQL model session.
"""
statement = (
select(
Sponsor.id,
Sponsor.client_id,
Sponsor.name.label("sponsor_name"),
Sponsor.short_name.label("sponsor_short_name"),
Client.name.label("client_name"),
)
.join(Client)
.where(Sponsor.is_active == True)
)
results = session.exec(statement).all()
return results
get_client_name_by_sponsor_id(sponsor_id, session=Depends(get_session))
async
Get client name by sponsor id.
Parameters
sponsor_id : int ID of sponsor to pull client name from. session : Session SQL session that is to be used to get the client name. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\sponsor.py
@router.get("/{sponsor_id}/client-name")
async def get_client_name_by_sponsor_id(
sponsor_id: int, session: Session = Depends(get_session)
):
"""
Get client name by sponsor id.
Parameters
----------
sponsor_id : int
ID of sponsor to pull client name from.
session : Session
SQL session that is to be used to get the client name.
Defaults to creating a dependency on the running SQL model session.
"""
statement = (
select(Sponsor.id, Client.id, Client.name)
.join(Client)
.where(Sponsor.id == sponsor_id)
.where(Client.is_active == True)
)
result = session.exec(statement).one()
return result
get_sponsor_list(session=Depends(get_session))
async
Get entire sponsor list (enabled and disabled).
Parameters
session : Session SQL session that is to be used to get a list of the sponsors. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\sponsor.py
@router.get("/")
async def get_sponsor_list(session: Session = Depends(get_session)):
"""
Get entire sponsor list (enabled and disabled).
Parameters
----------
session : Session
SQL session that is to be used to get a list of the sponsors.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Sponsor)
results = session.exec(statement).all()
return results
post_sponsor(*, sponsor, session=Depends(get_session))
async
Post new sponsor.
Parameters
sponsor : Sponsor Sponsor that is to be added to the database. session : Session SQL session that is to be used to add the sponsor. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\sponsor.py
@router.post("/")
async def post_sponsor(
*,
sponsor: Sponsor,
session: Session = Depends(get_session),
):
"""
Post new sponsor.
Parameters
----------
sponsor : Sponsor
Sponsor that is to be added to the database.
session : Session
SQL session that is to be used to add the sponsor.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Sponsor).where(
or_(Sponsor.name == sponsor.name, Sponsor.id == sponsor.id)
)
try:
result = session.exec(statement).one()
return False
except NoResultFound:
session.add(sponsor)
session.commit()
session.refresh(sponsor)
return sponsor
read_teams(sponsor_name=None, session=Depends(get_session))
async
Read the contents of a given sponsor.
Parameters
sponsor_name : str Name of sponsor to be read. session : Session SQL session that is to be used to read a sponsor. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\sponsor.py
@router.get("/{sponsor_name}")
async def read_teams(sponsor_name: str = None, session: Session = Depends(get_session)):
"""
Read the contents of a given sponsor.
Parameters
----------
sponsor_name : str
Name of sponsor to be read.
session : Session
SQL session that is to be used to read a sponsor.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Sponsor).where(Sponsor.name == sponsor_name)
try:
result = session.exec(statement).ones()
return result
except NoResultFound:
msg = f"""There is no sponsor named {sponsor_name}"""
update_sponsor(id=None, client_id=None, name=None, short_name=None, is_active=None, session=Depends(get_session))
async
Update a sponsor.
Parameters
id : int ID of sponsor to be updated. client_id : int Updated client ID. name : str Updated sponsor name. short_name : str Updated short name of sponsor. is_active : bool Updated status of sponsor. session : Session SQL session that is to be used to update the sponsor. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\sponsor.py
@router.put("/")
async def update_sponsor(
id: int = None,
client_id: int = None,
name: str = None,
short_name: str = None,
is_active: bool = None,
session: Session = Depends(get_session),
):
"""
Update a sponsor.
Parameters
----------
id : int
ID of sponsor to be updated.
client_id : int
Updated client ID.
name : str
Updated sponsor name.
short_name : str
Updated short name of sponsor.
is_active : bool
Updated status of sponsor.
session : Session
SQL session that is to be used to update the sponsor.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Sponsor).where(or_(Sponsor.name == name, Sponsor.id == id))
sponsor_to_update = session.exec(statement).one()
sponsor_to_update.client_id = client_id
sponsor_to_update.name = name
sponsor_to_update.short_name = short_name
sponsor_to_update.is_active = is_active
session.add(sponsor_to_update)
sponsor_to_update.updated_at = datetime.now()
session.commit()
session.refresh(sponsor_to_update)
return sponsor_to_update
team
activate_team(team_name=None, session=Depends(get_session))
async
Activate a team.
Parameters
team_name : str Name of team to be activated. session : Session SQL session that is to be used to activate the team. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\team.py
@router.put("/{team_name}/activate")
async def activate_team(
team_name: str = None,
session: Session = Depends(get_session),
):
"""
Activate a team.
Parameters
----------
team_name : str
Name of team to be activated.
session : Session
SQL session that is to be used to activate the team.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Team).where(Team.name == team_name)
team_to_activate = session.exec(statement).one()
team_to_activate.is_active = True
team_to_activate.updated_at = datetime.now()
session.add(team_to_activate)
session.commit()
session.refresh(team_to_activate)
return team_to_activate
deactivate_team(team_name=None, session=Depends(get_session))
async
Deactivate a team.
Parameters
team_name : str Name of team to be deactivated. session : Session SQL session that is to be used to deactivate the team. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\team.py
@router.put("/{team_name}/deactivate")
async def deactivate_team(
team_name: str = None,
session: Session = Depends(get_session),
):
"""
Deactivate a team.
Parameters
----------
team_name : str
Name of team to be deactivated.
session : Session
SQL session that is to be used to deactivate the team.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Team).where(Team.name == team_name)
team_to_deactivate = session.exec(statement).one()
team_to_deactivate.is_active = False
team_to_deactivate.updated_at = datetime.now()
session.add(team_to_deactivate)
session.commit()
session.refresh(team_to_deactivate)
return team_to_deactivate
get_active_team_list(session=Depends(get_session))
async
Get list of active teams.
Parameters
session : Session SQL session that is to be used to get a list of the active teams. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\team.py
@router.get("/active")
async def get_active_team_list(session: Session = Depends(get_session)):
"""
Get list of active teams.
Parameters
----------
session : Session
SQL session that is to be used to get a list of the active teams.
Defaults to creating a dependency on the running SQL model session.
"""
statement = (
select(
Team.id,
Team.lead_user_id,
Team.name.label("team_name"),
Team.short_name.label("team_short_name"),
User.id,
User.short_name.label("user_name"),
)
.join(User)
.where(Team.is_active == True)
)
results = session.exec(statement).all()
return results
get_team_list(session=Depends(get_session))
async
Get list of all teams.
Parameters
session : Session SQL session that is to be used to get the list of teams. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\team.py
@router.get("/")
async def get_team_list(session: Session = Depends(get_session)):
"""
Get list of all teams.
Parameters
----------
session : Session
SQL session that is to be used to get the list of teams.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Team)
results = session.exec(statement).all()
return results
get_user_name_by_team_id(team_id, session=Depends(get_session))
async
Get user name by team id.
Parameters
team_id : str ID of team to pull user name from. session : Session SQL session that is to be used to get the user name. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\team.py
@router.get("/{team_id}/user-name")
async def get_user_name_by_team_id(
team_id: int, session: Session = Depends(get_session)
):
"""
Get user name by team id.
Parameters
----------
team_id : str
ID of team to pull user name from.
session : Session
SQL session that is to be used to get the user name.
Defaults to creating a dependency on the running SQL model session.
"""
statement = (
select(Team.id, User.id, User.name)
.join(User)
.where(Team.id == team_id)
.where(User.active == True)
)
result = session.exec(statement).one()
return result
post_team(*, team, session=Depends(get_session))
async
Post new team.
Parameters
team : Team Team that is to be added to the database. session : Session SQL session that is to be used to add the team. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\team.py
@router.post("/")
async def post_team(
*,
team: Team,
session: Session = Depends(get_session),
):
"""
Post new team.
Parameters
----------
team : Team
Team that is to be added to the database.
session : Session
SQL session that is to be used to add the team.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Team).where(or_(Team.name == team.name, Team.id == team.id))
try:
result = session.exec(statement).one()
return False
except NoResultFound:
session.add(team)
session.commit()
session.refresh(team)
return team
read_teams(team_name=None, session=Depends(get_session))
async
Read the contents of a given team.
Parameters
team_name : str Name of team to be read. session : Session SQL session that is to be used to read the team. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\team.py
@router.get("/{team_name}")
async def read_teams(team_name: str = None, session: Session = Depends(get_session)):
"""
Read the contents of a given team.
Parameters
----------
team_name : str
Name of team to be read.
session : Session
SQL session that is to be used to read the team.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Team).where(Team.name == team_name)
try:
result = session.exec(statement).one()
return result
except NoResultFound:
msg = f"""There is no team named {team_name}"""
return msg
update_team(id=None, lead_user_id=None, name=None, is_active=None, session=Depends(get_session))
async
Update a team with new values.
Parameters
id : str ID of team to be updated. lead_user_id : str Updated lead user ID. name : str Updated name of team. is_active : bool Updated status of team. session : Session SQL session that is to be used to update the team. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\team.py
@router.put("/")
async def update_team(
id: str = None,
lead_user_id: str = None,
name: str = None,
is_active: bool = None,
session: Session = Depends(get_session),
):
"""
Update a team with new values.
Parameters
----------
id : str
ID of team to be updated.
lead_user_id : str
Updated lead user ID.
name : str
Updated name of team.
is_active : bool
Updated status of team.
session : Session
SQL session that is to be used to update the team.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(Team).where(or_(Team.name == name, Team.id == id))
team_to_update = session.exec(statement).one()
team_to_update.lead_user_id = lead_user_id
team_to_update.name = name
team_to_update.is_active = is_active
session.add(team_to_update)
team_to_update.updated_at = datetime.now()
session.commit()
session.refresh(team_to_update)
return team_to_update
timelog
delete_timelogs(*, timelog_id, session=Depends(get_session))
async
Delete a timelog.
Parameters
timelog_id : int ID of timelog to be deleted. session : Session SQL session that is to be used to delete the timelog. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\timelog.py
@router.delete("/{timelog_id}")
async def delete_timelogs(
*,
timelog_id: int,
session: Session = Depends(get_session),
):
"""
Delete a timelog.
Parameters
----------
timelog_id : int
ID of timelog to be deleted.
session : Session
SQL session that is to be used to delete the timelog.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(TimeLog).where(TimeLog.id == timelog_id)
result = session.exec(statement).one()
timelog_to_delete = result
session.delete(timelog_to_delete)
session.commit()
return True
get_timelog_by_id(timelog_id, session=Depends(get_session))
async
Get timelog by id.
Parameters
timelog_id : int ID of timelog to be returned. session : Session SQL session that is to be used to get the timelog. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\timelog.py
@router.get("/{timelog_id}")
async def get_timelog_by_id(timelog_id: int, session: Session = Depends(get_session)):
"""
Get timelog by id.
Parameters
----------
timelog_id : int
ID of timelog to be returned.
session : Session
SQL session that is to be used to get the timelog.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(TimeLog).where(TimeLog.id == timelog_id)
result = session.exec(statement).one()
return result
get_timelog_user_id(*, user_id, epic_id, month, year, session=Depends(get_session))
async
Get list of timelogs by user_id, month.
Parameters
user_id : str ID of user from which to pull timelogs. year_month : int Month and year from which to pull timelog(s). session : Session SQL session that is to be used to get the timelogs. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\timelog.py
@router.get("/users/{user_id}/epics/{epic_id}")
async def get_timelog_user_id(
*,
user_id: int,
epic_id: int,
month: int,
year: int,
session: Session = Depends(get_session),
):
"""
Get list of timelogs by user_id, month.
Parameters
----------
user_id : str
ID of user from which to pull timelogs.
year_month : int
Month and year from which to pull timelog(s).
session : Session
SQL session that is to be used to get the timelogs.
Defaults to creating a dependency on the running SQL model session.
"""
statement = (
select(
TimeLog.id,
User.short_name.label("username"),
Epic.short_name.label("epic_name"),
TimeLog.start_time,
TimeLog.end_time,
TimeLog.count_hours,
TimeLog.count_days,
)
.join(User)
.join(Epic)
.where(TimeLog.user_id == user_id)
.where(TimeLog.epic_id == epic_id)
.where(TimeLog.month == month)
.where(TimeLog.year == year)
.order_by(TimeLog.end_time.desc())
)
results = session.exec(statement).all()
return results
get_timelogs_all(session=Depends(get_session))
async
Get list all timelogs.
Parameters
session : Session SQL session that is to be used to get the timelogs. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\timelog.py
@router.get("/")
async def get_timelogs_all(session: Session = Depends(get_session)):
"""
Get list all timelogs.
Parameters
----------
session : Session
SQL session that is to be used to get the timelogs.
Defaults to creating a dependency on the running SQL model session.
"""
statement = (
select(
TimeLog.id,
User.short_name.label("username"),
Epic.short_name.label("epic_name"),
TimeLog.start_time,
TimeLog.end_time,
TimeLog.count_hours,
TimeLog.count_days,
)
.join(User)
.join(Epic)
.order_by(TimeLog.end_time.desc())
)
results = session.exec(statement).all()
return results
timelog(*, timelog, session=Depends(get_session))
async
Post new timelog.
Example: timelog.start_time = "2022-01-19T08:30:00.000Z
Parameters
timelog : TimeLog Timelog that is to be added to the database. session : Session SQL session that is to be used to add the timelog. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\timelog.py
@router.post("/")
async def timelog(*, timelog: TimeLog, session: Session = Depends(get_session)):
"""
Post new timelog.
Example: timelog.start_time = "2022-01-19T08:30:00.000Z
Parameters
----------
timelog : TimeLog
Timelog that is to be added to the database.
session : Session
SQL session that is to be used to add the timelog.
Defaults to creating a dependency on the running SQL model session.
"""
statement1 = (
select(TimeLog)
.where(TimeLog.user_id == timelog.user_id)
.where(TimeLog.start_time >= timelog.start_time)
.where(TimeLog.start_time < timelog.end_time)
)
statement2 = (
select(TimeLog)
.where(TimeLog.user_id == timelog.user_id)
.where(TimeLog.end_time > timelog.start_time)
.where(TimeLog.end_time <= timelog.end_time)
)
statement3 = (
select(TimeLog)
.where(TimeLog.user_id == timelog.user_id)
.where(TimeLog.start_time >= timelog.start_time)
.where(TimeLog.end_time <= timelog.end_time)
)
statement4 = (
select(TimeLog)
.where(TimeLog.user_id == timelog.user_id)
.where(TimeLog.start_time < timelog.start_time)
.where(TimeLog.end_time > timelog.end_time)
)
results1 = session.exec(statement1).all()
results2 = session.exec(statement2).all()
results3 = session.exec(statement3).all()
results4 = session.exec(statement4).all()
if results1 or results2 or results3 or results4:
return "currently posted timelog overlaps another timelog"
else:
time_delta = timelog.end_time - timelog.start_time
work_delta_hours = time_delta.total_seconds() / 3600
work_hours = "{:.2f}".format(work_delta_hours)
work_delta_days = time_delta.total_seconds() / 3600 / 8
work_days = "{:.2f}".format(work_delta_days)
timelog.count_hours = work_hours
timelog.count_days = work_days
session.add(timelog)
session.commit()
session.refresh(timelog)
return timelog
update_timelogs(*, timelog_id=None, timelog_new_start_time=None, session=Depends(get_session))
async
Update a timelog.
Parameters
timelog_id : int ID of timelog to be updated. timelog_new_start_time : str Updated start time of timelog. session : Session SQL session that is to be used to updated the timelog. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\timelog.py
@router.put("/{timelog_id}/new-start-time")
async def update_timelogs(
*,
timelog_id: int = None,
timelog_new_start_time: str = None,
session: Session = Depends(get_session),
):
"""
Update a timelog.
Parameters
----------
timelog_id : int
ID of timelog to be updated.
timelog_new_start_time : str
Updated start time of timelog.
session : Session
SQL session that is to be used to updated the timelog.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(TimeLog).where(TimeLog.id == timelog_id)
timelog_to_update = session.exec(statement).one()
timelog_to_update.start_time = timelog_new_start_time
session.add(timelog_to_update)
session.commit()
session.refresh(timelog_to_update)
return timelog_to_update
user
get_users(session=Depends(get_session), is_active=None, short_name=None)
async
Get list of user(s).
Parameters
session : Session SQL session that is to be used to get the users. Defaults to creating a dependency on the running SQL model session. is_active : bool Status of users to be pulled. short_name : str Short name of user to be pulled.
Source code in backend\api\user.py
@router.get("/")
async def get_users(
session: Session = Depends(get_session),
is_active: bool = None,
short_name: str = None,
):
"""
Get list of user(s).
Parameters
----------
session : Session
SQL session that is to be used to get the users.
Defaults to creating a dependency on the running SQL model session.
is_active : bool
Status of users to be pulled.
short_name : str
Short name of user to be pulled.
"""
statement = select(User)
if is_active != None:
statement = (
select(
User.short_name,
User.first_name,
User.last_name,
Role.short_name.label("role_short_name"),
Team.short_name.label("main_team"),
User.start_date,
)
.select_from(User)
.join(Role, User.role_id == Role.id, isouter=True)
.join(Team, User.team_id == Team.id, isouter=True)
.where(User.is_active == is_active)
.order_by(User.start_date.desc())
)
result = session.exec(statement).all()
return result
post_user(user, session=Depends(get_session))
async
Post a new user.
Parameters
user : User User that is to be added to the database. session : Session SQL session that is to be used to add the user. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\user.py
@router.post("/")
async def post_user(
user: User,
session: Session = Depends(get_session),
):
"""
Post a new user.
Parameters
----------
user : User
User that is to be added to the database.
session : Session
SQL session that is to be used to add the user.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(User).where(User.short_name == user.short_name)
try:
result = session.exec(statement).one()
return False
except NoResultFound:
session.add(user)
session.commit()
session.refresh(user)
return user
update_user(user_id, is_active=None, new_short_name=None, new_first_name=None, new_last_name=None, new_email=None, new_team_id=None, session=Depends(get_session))
async
Update a user.
Parameters
user_id : int ID of user to be updated. is_active : Optional[bool] Updated status of user. new_short_name : Optional[bool] Updated short name of user. new_first_name : Optional[bool] Updated first name of user. new_last_name : Optional[bool] Updated last name of user. new_email : Optional[bool] Updated email of user new_team_id : Optional[bool] Updated team id. session : Session SQL session that is to be used to update the user. Defaults to creating a dependency on the running SQL model session.
Source code in backend\api\user.py
@router.put("/{user_id}/")
async def update_user(
user_id: int,
is_active: Optional[bool] = None,
new_short_name: Optional[str] = None,
new_first_name: Optional[str] = None,
new_last_name: Optional[str] = None,
new_email: Optional[str] = None,
new_team_id: Optional[str] = None,
session: Session = Depends(get_session),
):
"""
Update a user.
Parameters
----------
user_id : int
ID of user to be updated.
is_active : Optional[bool]
Updated status of user.
new_short_name : Optional[bool]
Updated short name of user.
new_first_name : Optional[bool]
Updated first name of user.
new_last_name : Optional[bool]
Updated last name of user.
new_email : Optional[bool]
Updated email of user
new_team_id : Optional[bool]
Updated team id.
session : Session
SQL session that is to be used to update the user.
Defaults to creating a dependency on the running SQL model session.
"""
statement = select(User).where(User.id == user_id)
user_to_update = session.exec(statement).one()
if is_active != None:
user_to_update.is_active = is_active
if new_short_name != None:
user_to_update.short_name = new_short_name
if new_first_name != None:
user_to_update.first_name = new_first_name
if new_last_name != None:
user_to_update.last_name = new_last_name
if new_email != None:
user_to_update.email = new_email
if new_team_id != None:
user_to_update.team_id = new_team_id
user_to_update.updated_at = datetime.now()
session.add(user_to_update)
session.commit()
session.refresh(user_to_update)
return user_to_update