Ουρές

Πηγαίος κώδικας: Lib/asyncio/queues.py


Οι ουρές asyncio έχουν σχεδιαστεί ώστε να μοιάζουν με τις κλάσεις του module queue. Αν και οι ουρές asyncio δεν είναι ασφαλείς για χρήση με νήματα (thread-safe), έχουν σχεδιαστεί για να χρησιμοποιούνται συγκεκριμένα σε κώδικα async/await.

Σημειώστε ότι οι μέθοδοι των ουρών asyncio δεν διαθέτουν παράμετρο timeout. Χρησιμοποιήστε την συνάρτηση asyncio.wait_for() για να εκτελέσετε λειτουργίες ουράς με χρονικό όριο.

Δείτε επίσης την ενότητα Παραδείγματα παρακάτω.

Ουρά

class asyncio.Queue(maxsize=0)

Μια ουρά τύπου πρώτος που εισέρχεται, πρώτος που εξέρχεται (FIFO).

Αν η τιμή του maxsize είναι λιγότερη ή ίση με το μηδέν, το μέγεθος της ουράς είναι άπειρο. Αν είναι ένας ακέραιος μεγαλύτερος από το 0, τότε η εντολή await put() μπλοκάρει, όταν η ουρά φτάσει το maxsize μέχρι να αφαιρεθεί ένα στοιχείο μέσω της μεθόδου get().

Σε αντίθεση με την ουρά του queue στην βιβλιοθήκη threading, το μέγεθος της ουράς είναι πάντα γνωστό και μπορεί να επιστραφεί καλώντας τη μέθοδο qsize().

Άλλαξε στην έκδοση 3.10: Αφαιρέθηκε η παράμετρος loop.

Αυτή η κλάση είναι not thread safe.

maxsize

Αριθμός στοιχείων που επιτρέπονται στην ουρά.

empty()

Επιστρέφει True αν η ουρά είναι άδεια, διαφορετικά False.

full()

Επιστρέφει True αν υπάρχουν maxsize αντικείμενα στην ουρά.

Αν η ουρά αρχικοποιήθηκε με maxsize=0 (προεπιλογή), τότε η full() δεν επιστρέφει ποτέ True.

async get()

Αφαίρεση και επιστροφή ενός αντικειμένου από την ουρά. Αν η ουρά είναι κενή, περιμένετε μέχρι να είναι διαθέσιμο ένα αντικείμενο.

Κάνει raise μια QueueShutDown αν η ουρά έχει τερματιστεί και είναι κενή, ή αν η ουρά έχει τερματιστεί άμεσα.

get_nowait()

Επιστρέφει ένα αντικείμενο, αν είναι άμεσα διαθέσιμο, αλλιώς κάνε raise την QueueEmpty.

async join()

Αποκλείει μέχρι να ληφθούν και να υποβληθούν σε επεξεργασία όλα τα στοιχεία στην ουρά.

Ο αριθμός των ημιτελών εργασιών αυξάνεται κάθε φορά που προστίθεται ένα αντικείμενο στην ουρά. Ο αριθμός μειώνεται όταν μια καταναλωτική coroutine καλεί τη μέθοδο task_done() για να υποδείξει ότι το αντικείμενο λήφθηκε και η εργασία πάνω του έχει ολοκληρωθεί. Όταν ο αριθμός των ατελείωτων εργασιών μειωθεί στο μηδέν, η μέθοδος join() αποδεσμεύεται.

async put(item)

Τοποθετεί ένα αντικείμενο στην ουρά. Αν η ουρά είναι γεμάτη, περιμένετε μέχρι να είναι διαθέσιμη μια ελεύθερη θέση, πριν προσθέσετε το αντικείμενο.

Κάνει raise μια QueueShutDown αν η ουρά έχει τερματιστεί.

put_nowait(item)

Τοποθετεί ένα αντικείμενο στην ουρά χωρίς να μπλοκάρει.

Αν δεν είναι διαθέσιμη μια ελεύθερη θέση αμέσως, γίνεται raise η QueueFull.

qsize()

Επιστρέφει τον αριθμό των αντικειμένων στην ουρά.

shutdown(immediate=False)

Τερματίζει την ουρά, προκαλώντας την get() και put() κάνει raise την QueueShutDown.

Από προεπιλογή, η get() σε μια τερματισμένη ουρά θα κάνει raise εξαίρεση μόνο όταν η ουρά είναι κενή. Ορίστε το immediate σε true για να κάνετε την get() να εξάγει την εξαίρεση αμέσως αντί για αργότερα.

Όλοι οι αποκλεισμένοι καλούντες των put() και get() θα αποδεσμευτούν. Αν το immediate είναι αληθές, μια εργασία θα χαρακτηριστεί ως ολοκληρωμένη για κάθε εναπομείναν αντικείμενο στην ουρά, το οποίο μπορεί να αποδεσμευτεί στους καλούντες της join().

Added in version 3.13.

task_done()

Υποδεικνύει ότι μια εργασία που είχε προστεθεί στην ουρά έχει ολοκληρωθεί.

Χρησιμοποιείται από τους καταναλωτές της ουράς. Για κάθε κλήση της get() για να ανακτηθεί μια εργασία, μια επακόλουθη κλήση της task_done() ενημερώνει την ουρά ότι η επεξεργασία της εργασίας έχει ολοκληρωθεί.

Εάν μια κλήση της join() μπλοκάρει αυτή την στιγμή, θα συνεχιστεί όταν όλα τα αντικείμενα έχουν επεξεργαστεί (σημαίνει ότι λήφθηκε μια κλήση της task_done() για κάθε αντικείμενο που είχε προστεθεί με put() στην ουρά).

Το shutdown(immediate=True) καλεί τη task_done() για κάθε υπόλοιπο στοιχείο στην ουρά.

Κάνει raise την ValueError εάν κληθεί περισσότερες φορές από όσες τα αντικείμενα που είχαν τοποθετηθεί στην ουρά.

Σειρά Προτεραιότητας

class asyncio.PriorityQueue

Μια παραλλαγή της Queue; η οποία ανακτά τις καταχωρήσεις με σειρά προτεραιότητας (οι χαμηλότερες πρώτες).

Οι καταχωρήσεις είναι συνήθως της μορφής (priority_number, data).

Ουρά LIFO

class asyncio.LifoQueue

Μια παραλλαγή της κλάσης Queue που ανακτά τις πιο πρόσφατα προστιθέμενες καταχωρίσεις πρώτες (με τη λογική τελευταίος μέσα, πρώτος έξω).

Εξαιρέσεις

exception asyncio.QueueEmpty

Αυτή η εξαίρεση γίνεται raise όταν η μέθοδος get_nowait() καλείται σε μια άδεια ουρά.

exception asyncio.QueueFull

Εξαίρεση που γίνεται raise όταν η μέθοδος put_nowait() καλείται σε μια ουρά που έχει φτάσει στο maxsize της.

exception asyncio.QueueShutDown

Εξαίρεση που γίνεται raise όταν η μέθοδος put() ή get() καλείται σε μια ουρά που έχει τερματιστεί.

Added in version 3.13.

Παραδείγματα

Οι ουρές μπορούν να χρησιμοποιηθούν για τη διανομή εργασίας μεταξύ αρκετών παράλληλων εργασιών:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())