Priority queues are versatile data structures that provide efficient insertion and extraction of elements based on priority. In this comprehensive guide, we will explore various methods to implement priority queues in Python along with optimizations, use cases and design trade-offs.

Table of Contents

  • Overview of Priority Queues
  • Priority Queue Applications
  • Implementations in Python
    • Using Lists
    • Using heapq
    • Using queue.PriorityQueue
  • Time and Space Complexity
  • Benchmarking Performance
  • Optimized Implementations
  • Thread-safe Priority Queues
  • Distributed Priority Queues
  • Role in System Design

What are Priority Queues?

A priority queue is an abstract data type that operates similar to a regular queue except that each element has a "priority" associated with it. The priority of the elements decides the order in which elements are removed from the queue.

The element with the highest priority is removed first. If two elements have the same priority, they may be removed in any order.

Priority Queue

Unlike stacks or queues, priority queues don‘t have a fixed removing order. The element removal order is determined dynamically based on element priority.

Some key characteristics of priority queues:

  • Each element has a priority associated with it
  • The element with highest priority is removed first
  • Elements with equal priorities may be removed in any order
  • Lower priority elements are removed after higher priority elements

The priorities can be numeric or any custom objects that can be compared, like dates, strings etc.

Applications of Priority Queues

Priority queues are very useful in several applications:

1. System Scheduling

The OS scheduler uses priority queues to schedule processes and threads. Higher priority processes like system services get preference over user applications.

Pseudocode:

priority_queue processes; 

function schedule_process(process p) {
  processes.push(p, p.priority) 
}

function next_process() {
   return processes.pop() //returns highest priority process
}

2. Dispatching Tasks

Task queues can use priority queues to process higher priority jobs first. For example, sending automated emails.

priority_queue task_queue;

function add_task(task t) {
   task_queue.put(t, t.priority)  
}   

function get_next_task() {
   return task_queue.get() //returns highest priority task
}

3. Network Traffic

Network routers use priority queues to route packets. Sensitive traffic like video, voice get higher priority over data packets.

4. Graph Algorithms

Algorithms like Dijkstra use priority queues to determine next best path.

5. Emergency Room Triage

Hospital ER can prioritize patients based on severity of conditions using heaps.

These were some examples of priority queue applications. Up next, we will explore Python implementations.

Implementing Priority Queues in Python

There are several methods to implement priority queues in Python:

1. Using Lists

Python lists can serve as simple priority queues by using the sort() method.

"""
Priority Queue Implementation 
using Python Lists and sort() method
"""

class PriorityQueue:
    def __init__(self): 
        self.queue = [] #create empty list

    def push(self, priority, item):
        self.queue.append((priority, item))
        self.queue.sort() 

    def pop(self):
        return self.queue.pop(0) #pop and return highest priority item

# Usage    
myQueue = PriorityQueue() 
myQueue.push(1, "item1")
myQueue.push(3, "item3")
myQueue.push(4, "item4")
myQueue.push(2, "item2") 

while myQueue.queue:
   print(myQueue.pop())

Time Complexity: Insertion – O(nlogn) | Removal – O(n)

This approach provides simple implementation but slow performance for large data.

2. Using heapq Module

The heapq module contains specialized functions to manipulate objects as heap data structure. As heaps provide efficient implementation for priority queues, heapq is very useful.

Internally heapq uses binary heaps to maintain order. We simply invoke heappush() and heappop() methods to insert and remove elements based on priority order.

import heapq

# Priority Queue using heap  
class PriorityQueue:

    def __init__(self):
        self.heap = []

    def push(self, priority, item):
        heapq.heappush(self.heap, (priority, item)) #insertion

    def pop(self):
        return heapq.heappop(self.heap)[1] #return item being popped

# Usage
myQueue = PriorityQueue()  
myQueue.push(1, "item1")
myQueue.push(3, "item3") 
print(myQueue.pop()) #item1 returned

Time Complexity: Insert and Remove both take O(logn) time.

Much faster than list based approach.

3. Using queue.PriorityQueue class

Python queue module contains an inbuilt PriorityQueue class that implements a priority queue using heaps:

from queue import PriorityQueue

# PriorityQueue using inbuilt class 
myQueue = PriorityQueue()

myQueue.put((1, "item1")) #insertion based on priority
myQueue.put((3, "item3"))

while not myQueue.empty():
   print(myQueue.get()[1]) #return item

Provides clean interface, takes care of heap implementations internally.

Time Complexity: Same as heapq based method – O(logn) for insert and remove.

Below is a summary of time complexities of different implementations:

Operation List heapq PriorityQueue
Insert O(nlogn) O(logn) O(logn)
Remove O(n) O(logn) O(logn)

Comparing Priority Queue Performance in Python

To demonstrate performance in action, let‘s benchmark different implementations with varying number of inputs using Python timeit library.

Here is a simple script to compare their performance:

import timeit
import random
import matplotlib.pyplot as plt

# Implementations    
def list_queue(n):
    q = []
    for i in range(n):
        q.append((random.random(), i)) 
    q.sort(reverse=True)

def heap_queue(n):
    import heapq
    q = [] 
    for i in range(n):
        heapq.heappush(q, (random.random(), i))

def pq_queue(n):
    from queue import PriorityQueue
    q = PriorityQueue()
    for i in range(n):
        q.put((random.random(), i))

# Inputs
sizes = [100, 500, 1000, 5000, 10000, 50000, 100000]    

# Benchmark
list_times = []
heap_times = [] 
pq_times = []

for n in sizes:
    list_t = timeit.timeit(lambda: list_queue(n), number=1)
    list_times.append(list_t)

    heap_t = timeit.timeit(lambda: heap_queue(n), number=1) 
    heap_times.append(heap_t)

    pq_t = timeit.timeit(lambda: pq_queue(n), number=1)
    pq_times.append(pq_t)

# Plot graph    
plt.plot(sizes, list_times, label="List") 
plt.plot(sizes, heap_times, label="Heapq")
plt.plot(sizes, pq_times, label="PriorityQueue")
plt.legend()
plt.xlabel("Input data size")
plt.ylabel("Time (s)")

Here is the benchmark result:

Priority Queue Benchmark

Observations:

  • For small sizes (< 1000 items), simple list performs decent
  • As size increases heapq and PriorityQueue scale better with O(logn)
  • List approach gets significantly slower due to quadratic time complexity

So for most practical systems, heapq and PriorityQueue make a good choice!

Optimizing Priority Queue Performance

The basic binary heap structure in Python‘s heapq module and PriorityQueue class provide O(logn) performance. This works well for simpler applications.

However for advanced systems we may need even faster implementations. Some optimizations possible are:

Fibonacci Heaps

Fibonacci heaps provide O(1) time complexity for insertion and O(logn) deletion! Average case can be faster than binary heaps.

However code is much more complex to implement.

Binomial Heaps

Binomial heaps provide better space utilization and O(logn) insertion and deletion. Code complexity is moderate.

Pairing Heaps

Pairing heaps provides simpler implementation compared to Fibonacci with similar performance gains. Bulk deletions can be faster with O(log n) compared to O(n) for binary heaps.

Based on our use cases, several optimizations are possible over the standard heapq structure.

Here is one sample implementation of Pairing Heap for faster priority queue:

import gc

class PairingHeap:
    def __init__(self):
        self.root = None
        self.size = 0

    def merge(self, heap2):
        pass   

    def insert(self, item):
        node = PairingNode(item)
        if self.root is None:
           self.root = node
           self.size += 1
        else:
            self.root = self.combine(self.root, node)
            self.size += 1

    def combine(self, node1, node2):
        if node1 is None:
            return node2
        if node2 is None:
            return node1
        if node1.key < node2.key:
            # node1 has higher priority 
            # so becomes root        
            node1.child = self.combine(node1.child, node2) 
            node1.sibling = None
            return node1
        else:
            node2.child = self.combine(node1, node2.child)
            node2.sibling = None 
            return node2

    def removeMin(self):
        if self.root is None:
            return None        
        item = self.root 
        self.root = self.combineSiblings(self.root.child)
        self.size -= 1
        return item

    def combineSiblings(self, heap):
        if heap is None: 
            return None
        elif heap.sibling is None:
            return heap
        else:
            combined = self.combine(heap, heap.sibling) 
            heap.sibling = None
            return self.combineSiblings(combined) 

class PairingNode:
    def __init__(self, item): 
        self.key = item
        self.sibling = None
        self.child = None

By selecting optimal data structures, priority queue performance can be fine tuned based on actual requirements.

Thread Safe Priority Queue Implementation

In multi-threaded applications, concurrent access to shared priority queues need synchronization to prevent data corruption.

Python‘s PriorityQueue class is not thread safe out of the box.

Here is one way to implement a thread safe priority queue using locks:

from queue import PriorityQueue
import threading 
import time

lock = threading.Lock()

class SafePriorityQueue(PriorityQueue):

    def __init__(self):
        super().__init__()

    def put(self, item, block=True, timeout=None):  
        with lock:
            super().put(item, block, timeout)

    def get(self, block=True, timeout=None):
        with lock:
           return super().get(block, timeout)

# Usage:            
q = SafePriorityQueue()
t1 = threading.Thread(target = q.put, args = [(10, ‘foo‘)]) 
t2 = threading.Thread(target = q.put, args = [(20, ‘bar‘)])
t1.start()
t2.start()

The key idea is to wrap priority queue access within lock to serialize access across threads.

Other alternatives like multiprocessing.Queue can also build thread safe priority queues.

Implementing Distributed Priority Queues

For large scale applications that handle high volumes, a single machine priority queue may not be sufficient.

We can implement distributed priority queues using multiprocessing and dividing data across child worker processes.

Here is an example design:

           Root Process
            /   |     \
           /    |      \
PriorityQueue  PriorityQueue PriorityQueue
   Process          Process         Process


# Root process
def DistPriorityQueue(no_processes):

  # Divide incoming data amongst
  # child priority queue processes
  queues = []   
  for n in range(no_processes):
     p = PriorityQueueProcess()  
     queues.append(p)

  # Receive data 
  data = receive_data()

  # Route each data to least loaded queue
  def route(data):
     least_loaded = find_least_loaded(queues)  
     least_loaded.put(data)  

  # PriorityQueue sub-process  
  class PriorityQueueProcess():
     def __init__(self):
        self.queue = PriorityQueue() 

     def put(self, item):
        self.queue.put(item)

     def get():
         return self.queue.get()

By dividing and parallelizing priority queues across multiple processes, we can achieve:

  • Horizontal scalability
  • Higher throughput
  • Better CPU utilization

Actual implementation will utilize multiprocessing queues for parent-child communication.

Different routing policies can be developed to load balance queues.

Role of Priority Queues in System Design

Priority queues play an important role in the larger system design of services like job queues, task scheduling systems.

Here are some examples:

Task Scheduling Platform

A distributed task execution engine which needs to schedule 100s of tasks with different priorities across worker nodes.

Task Scheduling System

The scheduler service can leverage a priority queue to pick highest priority tasks first. Multiple redundant priority queue servers may be used for high availability.

Job Queue System

A job queue system like Celery that processes 1000s of jobs in a distributed manner. Jobs have different priorities based on creation time and business impact.

Celery architecture:

Celery Architecture

Here priority queues help build the job scheduling logic that determines order of job execution based on priority.

The foundations of an efficient priority queue implementation in Python helps tackle range of complex systems problems like above.

Wrapping Up

We explored various approaches for implementing priority queues in Python using concepts like heaps, multiprocessing etc.

Some key takeaways:

  • Priority queues are versatile for building scheduling systems
  • Python provides efficient methods like heapq, PriorityQueue
  • Optimizations possible using advanced techniques like Pairing heaps
  • Thread safety can be added using locks
  • Scaling achieved by distributing across processes

I hope you enjoyed this comprehensive guide to priority queue data structures and implementations. Please feel free to provide any feedback or questions!

Similar Posts