Source code for debusine.db.models.workers

# Copyright 2019, 2021-2024 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Data models for db workers."""

import copy
import hashlib
from typing import Any, Optional, TYPE_CHECKING

from django.db import IntegrityError, models, transaction
from django.db.models import (
    CheckConstraint,
    Count,
    F,
    JSONField,
    Q,
    QuerySet,
)
from django.utils import timezone
from django.utils.text import slugify

from debusine.db.models.auth import Token

if TYPE_CHECKING:
    from django_stubs_ext.db.models import TypedModelMeta
else:
    TypedModelMeta = object


class WorkerManager(models.Manager["Worker"]):
    """Manager for Worker model."""

    def connected(self) -> QuerySet["Worker"]:
        """Return connected workers."""
        return Worker.objects.filter(connected_at__isnull=False).order_by(
            'connected_at'
        )

    def waiting_for_work_request(self) -> QuerySet["Worker"]:
        """
        Return workers that can be assigned a new work request.

        The workers with fewer associated pending or running work requests
        than their concurrency level could take more work right now and are
        thus waiting for a work request.

        Worker's token must be enabled.
        """
        # Import here to prevent circular imports
        from debusine.db.models.work_requests import WorkRequest

        running_work_request_count = Count(
            'assigned_work_requests',
            filter=Q(
                assigned_work_requests__status__in=[
                    WorkRequest.Statuses.RUNNING,
                    WorkRequest.Statuses.PENDING,
                ]
            ),
        )
        workers = (
            Worker.objects.filter(connected_at__isnull=False)
            .order_by('connected_at')
            .annotate(count_running=running_work_request_count)
            .filter(count_running__lt=F("concurrency"))
            .filter(Q(internal=True) | Q(token__enabled=True))
        )

        return workers

    @staticmethod
    def _generate_unique_name(name: str, counter: int) -> str:
        """Return name slugified adding "-counter" if counter != 1."""
        new_name = slugify(name.replace('.', '-'))

        if counter != 1:
            new_name += f'-{counter}'

        return new_name

    @classmethod
    def create_with_fqdn(cls, fqdn: str, token: Token) -> "Worker":
        """Return a new Worker with its name based on fqdn, with token."""
        counter = 1

        while True:
            name = cls._generate_unique_name(fqdn, counter)
            try:
                with transaction.atomic():
                    return Worker.objects.create(
                        name=name, token=token, registered_at=timezone.now()
                    )
            except IntegrityError:
                counter += 1

    @classmethod
    def get_or_create_celery(cls) -> "Worker":
        """Return a new Worker representing the Celery task queue."""
        try:
            return Worker.objects.get(name="celery", internal=True)
        except Worker.DoesNotExist:
            return Worker.objects.create(
                name="celery", internal=True, registered_at=timezone.now()
            )

    def get_worker_by_token_key_or_none(
        self, token_key: str
    ) -> Optional["Worker"]:
        """Return a Worker identified by its associated secret token."""
        try:
            token_hash = hashlib.sha256(token_key.encode()).hexdigest()
            return Worker.objects.get(token__hash=token_hash)
        except Worker.DoesNotExist:
            return None

    def get_worker_or_none(self, worker_name: str) -> Optional["Worker"]:
        """Return the worker with worker_name or None."""
        try:
            return self.get(name=worker_name)
        except Worker.DoesNotExist:
            return None


[docs] class Worker(models.Model): """Database model of a worker.""" name = models.SlugField( unique=True, help_text='Human readable name of the worker based on the FQDN', ) registered_at = models.DateTimeField() connected_at = models.DateTimeField(blank=True, null=True) # This is the token used by the Worker to authenticate # Users have their own tokens - this is specific to a single worker. token = models.OneToOneField( Token, null=True, on_delete=models.PROTECT, related_name="worker" ) static_metadata = JSONField(default=dict, blank=True) dynamic_metadata = JSONField(default=dict, blank=True) dynamic_metadata_updated_at = models.DateTimeField(blank=True, null=True) internal = models.BooleanField(default=False) concurrency = models.PositiveIntegerField( default=1, help_text="Number of tasks this worker can run simultaneously", ) class Meta(TypedModelMeta): constraints = [ # Non-internal workers must have a token. CheckConstraint( name="%(app_label)s_%(class)s_internal_or_token", check=Q(internal=True) | Q(token__isnull=False), ) ]
[docs] def mark_disconnected(self) -> None: """Update and save relevant Worker fields after disconnecting.""" self.connected_at = None self.save()
[docs] def mark_connected(self) -> None: """Update and save relevant Worker fields after connecting.""" self.connected_at = timezone.now() self.save()
[docs] def connected(self) -> bool: """Return True if the Worker is connected.""" return self.connected_at is not None
[docs] def is_busy(self) -> bool: """ Return True if the Worker is busy with work requests. A Worker is busy if it has as many running or pending work requests as its concurrency level. """ # Import here to prevent circular imports from debusine.db.models.work_requests import WorkRequest return ( WorkRequest.objects.running(worker=self) | WorkRequest.objects.pending(worker=self) ).count() >= self.concurrency
[docs] def metadata(self) -> dict[str, Any]: """ Return all metadata with static_metadata and dynamic_metadata merged. If the same key is in static_metadata and dynamic_metadata: static_metadata takes priority. """ return { **copy.deepcopy(self.dynamic_metadata), **copy.deepcopy(self.static_metadata), }
[docs] def set_dynamic_metadata(self, metadata: dict[str, Any]) -> None: """Save metadata and update dynamic_metadata_updated_at.""" self.dynamic_metadata = metadata self.dynamic_metadata_updated_at = timezone.now() self.save()
def __str__(self) -> str: """Return the id and name of the Worker.""" return f"Id: {self.id} Name: {self.name}" objects = WorkerManager()