I adapted momo's answer to be self contained in a class and without thread-safety (since I was using this in asyncio.PriorityQueue context):
from dataclasses import dataclass, field
from typing import Any, ClassVar
@dataclass(order=True)
class FifoPriorityQueueItem:
data: Any=field(default=None, compare=False)
priority: int=10
sequence: int=field(default_factory=lambda: {0})
counter: ClassVar[int] = 0
def get_data(self):
return self.data
def __post_init__(self):
self.sequence = FifoPriorityQueueItem.next_seq()
@staticmethod
def next_seq():
FifoPriorityQueueItem.counter += 1
return FifoPriorityQueueItem.counter
def main():
import asyncio
print('with FifoPriorityQueueItem is FIFO')
q = asyncio.PriorityQueue()
q.put_nowait(FifoPriorityQueueItem('z'))
q.put_nowait(FifoPriorityQueueItem('y'))
q.put_nowait(FifoPriorityQueueItem('b', priority=1))
q.put_nowait(FifoPriorityQueueItem('x'))
q.put_nowait(FifoPriorityQueueItem('a', priority=1))
while not q.empty():
print(q.get_nowait().get_data())
print('without FifoPriorityQueueItem is no longer FIFO')
q.put_nowait((10, 'z'))
q.put_nowait((10, 'y'))
q.put_nowait((1, 'b'))
q.put_nowait((10, 'x'))
q.put_nowait((1, 'a'))
while not q.empty():
print(q.get_nowait()[1])
if __name__ == '__main__':
main()
Results in:
with FifoPriorityQueueItem is FIFO
b
a
z
y
x
without FifoPriorityQueueItem is no longer FIFO
a
b
x
y
z
self
in code? and on which line error is coming. – Jovialityself
). The error comes from line 6. – Castra