Source code for reana_db.utils

# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2018, 2019, 2020, 2021, 2022, 2023 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""REANA-DB utils."""

import logging
import os
from datetime import datetime, timedelta
from typing import Optional
from uuid import UUID

from sqlalchemy import inspect, func
from sqlalchemy.orm import defer
from reana_commons.utils import get_disk_usage
from reana_commons.errors import REANAMissingWorkspaceError

from reana_db.config import (
    PERIODIC_RESOURCE_QUOTA_UPDATE_POLICY,
    WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY,
)


[docs]def build_workspace_path(user_id, workflow_id=None, workspace_root_path=None): """Build user's workspace relative path. :param user_id: Owner of the workspace. :param workflow_id: Optional parameter, if provided gives the path to the workflow workspace instead of just the path to the user workspace. :param workspace_root_path: Optional parameter, if provided changes the root path under which the workflow workspaces are stored. :return: String that represents the workspace absolute path. i.e. /var/reana/users/0000/workflows/0034 """ from reana_commons.config import DEFAULT_WORKSPACE_PATH, SHARED_VOLUME_PATH users_dir = os.path.join("users", str(user_id), "workflows") if workspace_root_path: workspace_path = workspace_root_path # in case shared volume is used in workspace path use the default directory if SHARED_VOLUME_PATH in workspace_root_path: workspace_path = os.path.join(SHARED_VOLUME_PATH, users_dir) else: workspace_path = os.path.join(DEFAULT_WORKSPACE_PATH, users_dir) if workflow_id: workspace_path = os.path.join(workspace_path, str(workflow_id)) return workspace_path
[docs]def split_run_number(run_number): """Split run number into major and minor run numbers.""" run_number = str(run_number) if "." in run_number: return tuple(map(int, run_number.split(".", maxsplit=1))) return int(run_number), 0
def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid): """Get Workflow from database with uuid or name. :param uuid_or_name: String representing a valid UUIDv4 or valid Workflow name. Valid name contains only ASCII alphanumerics. Name might be in format 'reana.workflow.123' with arbitrary number of dot-delimited substrings, where last substring specifies the run number of the workflow this workflow name refers to. If name does not contain a valid run number, but it is a valid name, workflow with latest run number of all the workflows with this name is returned. :type uuid_or_name: String :param user_uuid: UUID of the workflow's owner. :rtype: reana-db.models.Workflow """ from reana_db.models import Workflow # Check existence if not uuid_or_name: raise ValueError("No Workflow was specified.") # Check validity try: uuid_or_name.encode("ascii") except UnicodeEncodeError: # `workflow_name` contains something else than just ASCII. raise ValueError("Workflow name {} is not valid.".format(uuid_or_name)) # Check if UUIDv4 try: # is_uuid = UUID(uuid_or_name, version=4) is_uuid = UUID("{" + uuid_or_name + "}", version=4) except (TypeError, ValueError): is_uuid = None if is_uuid: # `uuid_or_name` is an UUIDv4. # Search with it since it is expected to be unique. return _get_workflow_by_uuid(uuid_or_name, user_uuid) else: # `uuid_or_name` is not and UUIDv4. Expect it is a name. # Expect name might be in format 'reana.workflow.123' with arbitrary # number of dot-delimited substring, where last substring specifies # the run_number of the workflow this workflow name refers to. # Possible candidates for names are e.g. : # 'workflow_name' -> ValueError # 'workflow.name' -> True, True # 'workflow.name.123' -> True, True # '123.' -> True, False # '' -> ValueError # '.123' -> False, True # '..' -> False, False # '123.12' -> True, True # '123.12.' -> True, False # Try to split the dot-separated string. try: workflow_name, run_number = uuid_or_name.split(".", maxsplit=1) except ValueError: # Couldn't split. Probably not a dot-separated string. # -> Search with `uuid_or_name` return _get_workflow_by_name(uuid_or_name, user_uuid) # Check if `run_number` was specified if not run_number: # No `run_number` specified. # -> Search by `workflow_name` return _get_workflow_by_name(workflow_name, user_uuid) # `run_number` was specified. try: run_number_major, run_number_minor = split_run_number(run_number) except ValueError: # The specified `run_number` is not valid. # Assume that this string is the name of # the workflow and search with it. return _get_workflow_by_name(uuid_or_name, user_uuid) # `run_number` is valid. # Search by `run_number_major` and `run_number_minor`, since it is a primary key. workflow = Workflow.query.filter( Workflow.name == workflow_name, Workflow.run_number_major == run_number_major, Workflow.run_number_minor == run_number_minor, Workflow.owner_id == user_uuid, ).one_or_none() if not workflow: raise ValueError( "REANA_WORKON is set to {0}, but " "that workflow does not exist. " "Please set your REANA_WORKON environment " "variable appropriately.".format(workflow_name) ) return workflow def _get_workflow_by_name(workflow_name, user_uuid): """From Workflows named as `workflow_name` the latest run_number. Only use when you are sure that workflow_name is not UUIDv4. :rtype: reana-db.models.Workflow """ from reana_db.models import Workflow workflow = ( Workflow.query.filter( Workflow.name == workflow_name, Workflow.owner_id == user_uuid ) .order_by(Workflow.run_number_major.desc(), Workflow.run_number_minor.desc()) .first() ) if not workflow: raise ValueError( "REANA_WORKON is set to {0}, but " "that workflow does not exist. " "Please set your REANA_WORKON environment " "variable appropriately.".format(workflow_name) ) return workflow def _get_workflow_by_uuid(workflow_uuid, user_uuid): """Get Workflow with UUIDv4. :param workflow_uuid: UUIDv4 of a Workflow. :type workflow_uuid: String representing a valid UUIDv4. :param user_uuid: UUID of the workflow's owner. :rtype: reana-db.models.Workflow """ from reana_db.models import Workflow workflow = Workflow.query.filter( Workflow.id_ == workflow_uuid, Workflow.owner_id == user_uuid ).first() if not workflow: raise ValueError( "REANA_WORKON is set to {0}, but " "that workflow does not exist. " "Please set your REANA_WORKON environment " "variable appropriately.".format(workflow_uuid) ) return workflow
[docs]class Timer: """Timer to time events and log periodic progress.""" def __init__(self, name=None, total=None, periodic_delta=100) -> None: """Initialise new Timer.""" self.name = name self.total = total self.periodic_delta = periodic_delta self.count = 0 self.start = datetime.now()
[docs] def elapsed(self) -> float: """Elapsed time since the creation of the Timer, in seconds.""" diff = datetime.now() - self.start return diff.total_seconds()
[docs] def estimated_total(self) -> float: """Estimated total time, in seconds.""" if not self.total or not self.count: return 0 return self.elapsed() * self.total / self.count
[docs] def per_event(self) -> float: """Time per event, in seconds.""" if self.count == 0: return 0 return self.elapsed() / self.count
[docs] def log_progress(self) -> None: """Log progress of events.""" progress = "" if self.name: progress = f"{self.name} " progress += f"progress: {self.count}" if self.total: progress += f"/{self.total}" progress += ( f" elapsed: {self.elapsed():.3f}s" f" est.total: {self.estimated_total():.3f}s" f" per event: {self.per_event():.3f}s" ) logging.info(progress)
[docs] def log_periodic_progress(self) -> None: """Periodically log progress of events. Progress is logged periodically after a given amount of events and when all the events are completed. """ if self.count != self.total and self.count % self.periodic_delta != 0: return self.log_progress()
[docs] def count_event(self) -> None: """Count a new event.""" self.count += 1 self.log_periodic_progress()
[docs]def get_default_quota_resource(resource_type): """ Get default quota resource by given resource type. :param resource_type: Resource type corresponding to default resource to get. :type resource_type: reana_db.models.ResourceType """ from reana_db.config import DEFAULT_QUOTA_RESOURCES from reana_db.models import Resource if resource_type not in DEFAULT_QUOTA_RESOURCES.keys(): raise Exception( "Default resource of type {} does not exist.".format(resource_type) ) return Resource.query.filter_by(name=DEFAULT_QUOTA_RESOURCES[resource_type]).one()
[docs]def should_skip_quota_update(resource_type) -> bool: """Check if quota updates should be skipped based on the update policy. :param resource_type: Resource type of the quota that needs to be updated. """ return ( resource_type.name not in WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY and not PERIODIC_RESOURCE_QUOTA_UPDATE_POLICY )
[docs]def update_users_disk_quota( user=None, bytes_to_sum: Optional[int] = None, override_policy_checks: bool = False ) -> None: """Update users disk quota usage. User disk quota usage will be calculated from the individual workflow disk quota usage numbers, so this function should be typically called only after ``update_workflows_disk_quota()``. :param user: User whose disk quota will be updated. If None, applies to all users. :param bytes_to_sum: Amount of bytes to sum to user disk quota, if None, `du` will be used to recalculate it. :type user: reana_db.models.User :type bytes_to_sum: int :param override_policy_checks: Whether to update the disk quota without checking the update policy. """ from reana_db.database import Session from reana_db.models import ( Workflow, WorkflowResource, ResourceType, User, UserResource, ) if not override_policy_checks and should_skip_quota_update(ResourceType.disk): return disk_resource = get_default_quota_resource(ResourceType.disk.name) users = [user] if user else User.query.all() timer = Timer("User disk quota usage update", total=len(users)) for u in users: user_resource_quota = UserResource.query.filter_by( user_id=u.id_, resource_id=disk_resource.id_ ).one() if bytes_to_sum is not None: updated_quota_usage = user_resource_quota.quota_used + bytes_to_sum if updated_quota_usage < 0: logging.warning( f"Disk quota consumption of user {u.id_} would become negative: " f"{user_resource_quota.quota_used} [original usage] + {bytes_to_sum} [delta] " f"-> {updated_quota_usage} [new usage]. Setting the new usage to zero." ) user_resource_quota.quota_used = 0 else: user_resource_quota.quota_used = updated_quota_usage else: # get the size of each workspace of each workflow of the given user size_per_workspace = ( Session.query( Workflow.workspace_path, func.max(WorkflowResource.quota_used).label("quota_used"), ) .filter(WorkflowResource.workflow_id == Workflow.id_) .filter(WorkflowResource.resource_id == disk_resource.id_) .filter(Workflow.id_.in_(Session.query(u.workflows.subquery().c.id_))) # multiple workflows might have the same workspace path, so the query groups # by `workspace_path` in order to consider each workspace only once .group_by(Workflow.workspace_path) .subquery() ) disk_usage_bytes = Session.query( func.sum(size_per_workspace.c.quota_used) ).scalar() if not disk_usage_bytes: disk_usage_bytes = 0 user_resource_quota.quota_used = disk_usage_bytes Session.commit() timer.count_event()
[docs]def update_workflow_cpu_quota(workflow) -> int: """Update workflow CPU quota based on started and finished/stopped times. :return: Workflow running time in milliseconds if workflow has terminated, else 0. """ from reana_db.database import Session from reana_db.models import ( ResourceType, UserResource, WorkflowResource, ) if should_skip_quota_update(ResourceType.cpu): return cpu_resource = get_default_quota_resource(ResourceType.cpu.name) terminated_at = workflow.run_finished_at or workflow.run_stopped_at if workflow.run_started_at and terminated_at: cpu_time = terminated_at - workflow.run_started_at cpu_milliseconds = int(cpu_time.total_seconds() * 1000) # WorkflowResource might exist already if the cluster # follows a combined termination + periodic policy (eg. created # by the status listener, revisited by the cronjob) workflow_resource = WorkflowResource.query.filter_by( workflow_id=workflow.id_, resource_id=cpu_resource.id_ ).one_or_none() if workflow_resource: workflow_resource.quota_used = cpu_milliseconds else: workflow_resource = WorkflowResource( workflow_id=workflow.id_, resource_id=cpu_resource.id_, quota_used=cpu_milliseconds, ) Session.add(workflow_resource) Session.commit() return cpu_milliseconds return 0
[docs]def update_workflows_cpu_quota() -> None: """Update the CPU quotas of all workflows in a more efficient way.""" from reana_db.database import Session from reana_db.models import Workflow # logs and reana_specification are not loaded to avoid consuming # huge amounts of memory workflows = Workflow.query.options( defer(Workflow.logs), defer(Workflow.reana_specification) ).all() # We expunge all the workflows, as they will not be modified when updating the quotas. # This makes `Session.commit()` much faster for workflow in workflows: Session.expunge(workflow) timer = Timer("Workflow CPU quota usage update", total=len(workflows)) for workflow in workflows: update_workflow_cpu_quota(workflow) timer.count_event()
[docs]def update_users_cpu_quota(user=None) -> None: """Update users CPU quota usage. User CPU quotas will be calculated from workflow CPU quotas, so the latter should be updated before the former. :param user: User whose CPU quota will be updated. If None, applies to all users. :type user: reana_db.models.User """ from reana_db.database import Session from reana_db.models import ( ResourceType, User, UserResource, UserToken, UserTokenStatus, WorkflowResource, ) if should_skip_quota_update(ResourceType.cpu): return cpu_resource = get_default_quota_resource(ResourceType.cpu.name) if user: users = [user] else: users = ( User.query.join(UserToken) .filter_by(status=UserTokenStatus.active) # skip users with no active token .all() ) timer_user = Timer("User CPU quota usage update", total=len(users)) for user in users: cpu_milliseconds = ( Session.query(func.sum(WorkflowResource.quota_used)) .filter(WorkflowResource.resource_id == cpu_resource.id_) .join(user.workflows.subquery()) .scalar() ) if not cpu_milliseconds: cpu_milliseconds = 0 user_resource_quota = UserResource.query.filter_by( user_id=user.id_, resource_id=cpu_resource.id_ ).first() user_resource_quota.quota_used = cpu_milliseconds Session.commit() timer_user.count_event()
[docs]def update_workspace_retention_rules(rules, status) -> None: """Update workspace retention rules status. :param rules: Workspace retention rules that need to be updated :param status: Status accoring which retention rules need to be updated :type rules: reana_db.models.WorkspaceRetentionRule :type status: reana_db.models.WorkspaceRetentionRuleStatus """ from reana_db.database import Session from reana_db.models import WorkspaceRetentionRuleStatus for rule in rules: if rule.status == status: continue if not rule.can_transition_to(status): raise Exception( f"Cannot transition workspace retention rule {rule.id_} " f"from status {rule.status} to {status}." ) if status == WorkspaceRetentionRuleStatus.inactive: rule.apply_on = None if status == WorkspaceRetentionRuleStatus.active: rule.apply_on = datetime.today().replace( hour=23, minute=59, second=59 ) + timedelta(days=rule.retention_days) rule.status = status Session.add(rule) Session.commit()
[docs]def get_disk_usage_or_zero(workspace_path) -> int: """Get disk usage for the workspace if exists, zero if not.""" from reana_db.models import ResourceType if ( ResourceType.disk.name not in WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY and not PERIODIC_RESOURCE_QUOTA_UPDATE_POLICY ): return 0 try: disk_bytes = get_disk_usage(workspace_path, summarize=True) return int(disk_bytes[0]["size"]["raw"]) except REANAMissingWorkspaceError: return 0
[docs]def store_workflow_disk_quota( workflow, bytes_to_sum: Optional[int] = None, override_policy_checks: bool = False ): """ Update or create disk workflow resource. :param workflow: Workflow whose disk resource usage must be calculated. :param bytes_to_sum: Amount of bytes to sum to workflow disk quota, if None, `du` will be used to recalculate it. :type workflow: reana_db.models.Workflow :type bytes_to_sum: int :param override_policy_checks: Whether to update the disk quota without checking the update policy. """ from reana_db.database import Session from reana_db.models import ResourceType, WorkflowResource if not override_policy_checks and should_skip_quota_update(ResourceType.disk): return disk_resource = get_default_quota_resource(ResourceType.disk.name) workflow_resource = ( Session.query(WorkflowResource) .filter_by(workflow_id=workflow.id_, resource_id=disk_resource.id_) .one_or_none() ) if workflow_resource: if bytes_to_sum is not None: updated_quota_usage = workflow_resource.quota_used + bytes_to_sum if updated_quota_usage < 0: logging.warning( f"Disk quota consumption of workflow {workflow.id_} would become negative: " f"{workflow_resource.quota_used} [original usage] + {bytes_to_sum} [delta] " f"-> {updated_quota_usage} [new usage]. Setting the new usage to zero." ) workflow_resource.quota_used = 0 else: workflow_resource.quota_used = updated_quota_usage else: workflow_resource.quota_used = get_disk_usage_or_zero( workflow.workspace_path ) Session.commit() else: workflow_resource = WorkflowResource( workflow_id=workflow.id_, resource_id=disk_resource.id_, quota_used=get_disk_usage_or_zero(workflow.workspace_path), ) Session.add(workflow_resource) Session.commit() return workflow_resource
[docs]def update_workflows_disk_quota() -> None: """Update the disk quotas of all workflows in a more efficient way.""" from reana_db.database import Session from reana_db.models import Workflow # logs and reana_specification are not loaded to avoid consuming # huge amounts of memory workflows = Workflow.query.options( defer(Workflow.logs), defer(Workflow.reana_specification) ).all() # We expunge all the workflows, as they will not be modified when updating the quotas. # This makes `Session.commit()` much faster for workflow in workflows: Session.expunge(workflow) timer = Timer("Workflow disk quota usage update", total=len(workflows)) for workflow in workflows: store_workflow_disk_quota(workflow) timer.count_event()