Ουρές¶
Πηγαίος κώδικας: Lib/asyncio/queues.py
Οι ουρές asyncio έχουν σχεδιαστεί ώστε να μοιάζουν με τις κλάσεις του module queue
. Αν και οι ουρές asyncio δεν είναι ασφαλείς για χρήση με νήματα (thread-safe), έχουν σχεδιαστεί για να χρησιμοποιούνται συγκεκριμένα σε κώδικα async/await.
Σημειώστε ότι οι μέθοδοι των ουρών asyncio δεν διαθέτουν παράμετρο timeout. Χρησιμοποιήστε την συνάρτηση asyncio.wait_for()
για να εκτελέσετε λειτουργίες ουράς με χρονικό όριο.
Δείτε επίσης την ενότητα Παραδείγματα παρακάτω.
Ουρά¶
- class asyncio.Queue(maxsize=0)¶
Μια ουρά τύπου πρώτος που εισέρχεται, πρώτος που εξέρχεται (FIFO).
Αν η τιμή του maxsize είναι λιγότερη ή ίση με το μηδέν, το μέγεθος της ουράς είναι άπειρο. Αν είναι ένας ακέραιος μεγαλύτερος από το
0
, τότε η εντολήawait put()
μπλοκάρει, όταν η ουρά φτάσει το maxsize μέχρι να αφαιρεθεί ένα στοιχείο μέσω της μεθόδουget()
.Σε αντίθεση με την ουρά του
queue
στην βιβλιοθήκη threading, το μέγεθος της ουράς είναι πάντα γνωστό και μπορεί να επιστραφεί καλώντας τη μέθοδοqsize()
.Άλλαξε στην έκδοση 3.10: Αφαιρέθηκε η παράμετρος loop.
Αυτή η κλάση είναι not thread safe.
- maxsize¶
Αριθμός στοιχείων που επιτρέπονται στην ουρά.
- empty()¶
Επιστρέφει
True
αν η ουρά είναι άδεια, διαφορετικάFalse
.
- full()¶
Επιστρέφει
True
αν υπάρχουνmaxsize
αντικείμενα στην ουρά.Αν η ουρά αρχικοποιήθηκε με
maxsize=0
(προεπιλογή), τότε ηfull()
δεν επιστρέφει ποτέTrue
.
- async get()¶
Αφαίρεση και επιστροφή ενός αντικειμένου από την ουρά. Αν η ουρά είναι κενή, περιμένετε μέχρι να είναι διαθέσιμο ένα αντικείμενο.
Κάνει raise μια
QueueShutDown
αν η ουρά έχει τερματιστεί και είναι κενή, ή αν η ουρά έχει τερματιστεί άμεσα.
- get_nowait()¶
Επιστρέφει ένα αντικείμενο, αν είναι άμεσα διαθέσιμο, αλλιώς κάνε raise την
QueueEmpty
.
- async join()¶
Αποκλείει μέχρι να ληφθούν και να υποβληθούν σε επεξεργασία όλα τα στοιχεία στην ουρά.
Ο αριθμός των ημιτελών εργασιών αυξάνεται κάθε φορά που προστίθεται ένα αντικείμενο στην ουρά. Ο αριθμός μειώνεται όταν μια καταναλωτική coroutine καλεί τη μέθοδο
task_done()
για να υποδείξει ότι το αντικείμενο λήφθηκε και η εργασία πάνω του έχει ολοκληρωθεί. Όταν ο αριθμός των ατελείωτων εργασιών μειωθεί στο μηδέν, η μέθοδοςjoin()
αποδεσμεύεται.
- async put(item)¶
Τοποθετεί ένα αντικείμενο στην ουρά. Αν η ουρά είναι γεμάτη, περιμένετε μέχρι να είναι διαθέσιμη μια ελεύθερη θέση, πριν προσθέσετε το αντικείμενο.
Κάνει raise μια
QueueShutDown
αν η ουρά έχει τερματιστεί.
- put_nowait(item)¶
Τοποθετεί ένα αντικείμενο στην ουρά χωρίς να μπλοκάρει.
Αν δεν είναι διαθέσιμη μια ελεύθερη θέση αμέσως, γίνεται raise η
QueueFull
.
- qsize()¶
Επιστρέφει τον αριθμό των αντικειμένων στην ουρά.
- shutdown(immediate=False)¶
Put a
Queue
instance into a shutdown mode.The queue can no longer grow. Future calls to
put()
raiseQueueShutDown
. Currently blocked callers ofput()
will be unblocked and will raiseQueueShutDown
in the formerly blocked thread.If immediate is false (the default), the queue can be wound down normally with
get()
calls to extract tasks that have already been loaded.And if
task_done()
is called for each remaining task, a pendingjoin()
will be unblocked normally.Once the queue is empty, future calls to
get()
will raiseQueueShutDown
.If immediate is true, the queue is terminated immediately. The queue is drained to be completely empty and the count of unfinished tasks is reduced by the number of tasks drained. If unfinished tasks is zero, callers of
join()
are unblocked. Also, blocked callers ofget()
are unblocked and will raiseQueueShutDown
because the queue is empty.Use caution when using
join()
with immediate set to true. This unblocks the join even when no work has been done on the tasks, violating the usual invariant for joining a queue.Added in version 3.13.
- task_done()¶
Υποδεικνύει ότι μια εργασία που είχε προστεθεί στην ουρά έχει ολοκληρωθεί.
Χρησιμοποιείται από τους καταναλωτές της ουράς. Για κάθε κλήση της
get()
για να ανακτηθεί μια εργασία, μια επακόλουθη κλήση τηςtask_done()
ενημερώνει την ουρά ότι η επεξεργασία της εργασίας έχει ολοκληρωθεί.Εάν μια κλήση της
join()
μπλοκάρει αυτή την στιγμή, θα συνεχιστεί όταν όλα τα αντικείμενα έχουν επεξεργαστεί (σημαίνει ότι λήφθηκε μια κλήση τηςtask_done()
για κάθε αντικείμενο που είχε προστεθεί μεput()
στην ουρά).Κάνει raise την
ValueError
εάν κληθεί περισσότερες φορές από όσες τα αντικείμενα που είχαν τοποθετηθεί στην ουρά.
Σειρά Προτεραιότητας¶
Ουρά LIFO¶
Εξαιρέσεις¶
- exception asyncio.QueueEmpty¶
Αυτή η εξαίρεση γίνεται raise όταν η μέθοδος
get_nowait()
καλείται σε μια άδεια ουρά.
- exception asyncio.QueueFull¶
Εξαίρεση που γίνεται raise όταν η μέθοδος
put_nowait()
καλείται σε μια ουρά που έχει φτάσει στο maxsize της.
Παραδείγματα¶
Οι ουρές μπορούν να χρησιμοποιηθούν για τη διανομή εργασίας μεταξύ αρκετών παράλληλων εργασιών:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())