Ουρές

Πηγαίος κώδικας: Lib/asyncio/queues.py


Οι ουρές asyncio έχουν σχεδιαστεί ώστε να μοιάζουν με τις κλάσεις του module queue. Αν και οι ουρές asyncio δεν είναι ασφαλείς για χρήση με νήματα (thread-safe), έχουν σχεδιαστεί για να χρησιμοποιούνται συγκεκριμένα σε κώδικα async/await.

Σημειώστε ότι οι μέθοδοι των ουρών asyncio δεν διαθέτουν παράμετρο timeout. Χρησιμοποιήστε την συνάρτηση asyncio.wait_for() για να εκτελέσετε λειτουργίες ουράς με χρονικό όριο.

Δείτε επίσης την ενότητα Παραδείγματα παρακάτω.

Ουρά

class asyncio.Queue(maxsize=0, *, loop=None)

Μια ουρά τύπου πρώτος που εισέρχεται, πρώτος που εξέρχεται (FIFO).

Αν η τιμή του maxsize είναι λιγότερη ή ίση με το μηδέν, το μέγεθος της ουράς είναι άπειρο. Αν είναι ένας ακέραιος μεγαλύτερος από το 0, τότε η εντολή await put() μπλοκάρει, όταν η ουρά φτάσει το maxsize μέχρι να αφαιρεθεί ένα στοιχείο μέσω της μεθόδου get().

Σε αντίθεση με την ουρά του queue στην βιβλιοθήκη threading, το μέγεθος της ουράς είναι πάντα γνωστό και μπορεί να επιστραφεί καλώντας τη μέθοδο qsize().

Deprecated since version 3.8, will be removed in version 3.10: The loop parameter.

Αυτή η κλάση είναι not thread safe.

maxsize

Αριθμός στοιχείων που επιτρέπονται στην ουρά.

empty()

Επιστρέφει True αν η ουρά είναι άδεια, διαφορετικά False.

full()

Επιστρέφει True αν υπάρχουν maxsize αντικείμενα στην ουρά.

If the queue was initialized with maxsize=0 (the default), then full() never returns True.

coroutine get()

Αφαίρεση και επιστροφή ενός αντικειμένου από την ουρά. Αν η ουρά είναι κενή, περιμένετε μέχρι να είναι διαθέσιμο ένα αντικείμενο.

get_nowait()

Επιστρέφει ένα αντικείμενο, αν είναι άμεσα διαθέσιμο, αλλιώς κάνε raise την QueueEmpty.

coroutine join()

Αποκλείει μέχρι να ληφθούν και να υποβληθούν σε επεξεργασία όλα τα στοιχεία στην ουρά.

Ο αριθμός των ημιτελών εργασιών αυξάνεται κάθε φορά που προστίθεται ένα αντικείμενο στην ουρά. Ο αριθμός μειώνεται όταν μια καταναλωτική coroutine καλεί τη μέθοδο task_done() για να υποδείξει ότι το αντικείμενο λήφθηκε και η εργασία πάνω του έχει ολοκληρωθεί. Όταν ο αριθμός των ατελείωτων εργασιών μειωθεί στο μηδέν, η μέθοδος join() αποδεσμεύεται.

coroutine put(item)

Τοποθετεί ένα αντικείμενο στην ουρά. Αν η ουρά είναι γεμάτη, περιμένετε μέχρι να είναι διαθέσιμη μια ελεύθερη θέση, πριν προσθέσετε το αντικείμενο.

put_nowait(item)

Τοποθετεί ένα αντικείμενο στην ουρά χωρίς να μπλοκάρει.

Αν δεν είναι διαθέσιμη μια ελεύθερη θέση αμέσως, γίνεται raise η QueueFull.

qsize()

Επιστρέφει τον αριθμό των αντικειμένων στην ουρά.

task_done()

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

Εάν μια κλήση της join() μπλοκάρει αυτή την στιγμή, θα συνεχιστεί όταν όλα τα αντικείμενα έχουν επεξεργαστεί (σημαίνει ότι λήφθηκε μια κλήση της task_done() για κάθε αντικείμενο που είχε προστεθεί με put() στην ουρά).

Κάνει raise την ValueError εάν κληθεί περισσότερες φορές από όσες τα αντικείμενα που είχαν τοποθετηθεί στην ουρά.

Σειρά Προτεραιότητας

class asyncio.PriorityQueue

Μια παραλλαγή της Queue; η οποία ανακτά τις καταχωρήσεις με σειρά προτεραιότητας (οι χαμηλότερες πρώτες).

Οι καταχωρήσεις είναι συνήθως της μορφής (priority_number, data).

Ουρά LIFO

class asyncio.LifoQueue

Μια παραλλαγή της κλάσης Queue που ανακτά τις πιο πρόσφατα προστιθέμενες καταχωρίσεις πρώτες (με τη λογική τελευταίος μέσα, πρώτος έξω).

Εξαιρέσεις

exception asyncio.QueueEmpty

Αυτή η εξαίρεση γίνεται raise όταν η μέθοδος get_nowait() καλείται σε μια άδεια ουρά.

exception asyncio.QueueFull

Εξαίρεση που γίνεται raise όταν η μέθοδος put_nowait() καλείται σε μια ουρά που έχει φτάσει στο maxsize της.

Παραδείγματα

Οι ουρές μπορούν να χρησιμοποιηθούν για τη διανομή εργασίας μεταξύ αρκετών παράλληλων εργασιών:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())