Threading lets you run code concurrently. Here's how to use it safely in Python.
Basic Threads
import threading
import time
def worker(name, delay):
print(f"{name} starting")
time.sleep(delay)
print(f"{name} done")
# Create and start threads
t1 = threading.Thread(target=worker, args=("Thread-1", 2))
t2 = threading.Thread(target=worker, args=("Thread-2", 1))
t1.start()
t2.start()
# Wait for completion
t1.join()
t2.join()
print("All done")Thread with Return Value
import threading
class ResultThread(threading.Thread):
def __init__(self, func, args=()):
super().__init__()
self.func = func
self.args = args
self.result = None
def run(self):
self.result = self.func(*self.args)
def compute(x):
return x * x
t = ResultThread(compute, (5,))
t.start()
t.join()
print(t.result) # 25Daemon Threads
import threading
import time
def background_task():
while True:
print("Background running...")
time.sleep(1)
# Daemon threads are killed when main thread exits
t = threading.Thread(target=background_task, daemon=True)
t.start()
time.sleep(3)
print("Main thread exiting")
# Daemon thread is automatically terminatedLocks
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock: # Thread-safe
counter += 1
threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # 500000 (correct)RLock (Reentrant Lock)
import threading
rlock = threading.RLock()
def outer():
with rlock:
print("Outer acquired")
inner() # Can acquire same lock again
def inner():
with rlock:
print("Inner acquired")
# Regular Lock would deadlock here
outer()Condition Variables
import threading
queue = []
condition = threading.Condition()
def producer():
for i in range(5):
with condition:
queue.append(i)
print(f"Produced {i}")
condition.notify()
def consumer():
while True:
with condition:
while not queue:
condition.wait()
item = queue.pop(0)
print(f"Consumed {item}")
if item == 4:
break
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t2.start()
t1.start()
t1.join()
t2.join()Events
import threading
import time
event = threading.Event()
def waiter():
print("Waiting for event...")
event.wait()
print("Event received!")
def setter():
time.sleep(2)
print("Setting event")
event.set()
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)
t1.start()
t2.start()
t1.join()
t2.join()
# Reset event
event.clear()Semaphore
import threading
import time
# Limit concurrent access
semaphore = threading.Semaphore(3)
def limited_resource(name):
with semaphore:
print(f"{name} acquired")
time.sleep(1)
print(f"{name} released")
threads = [threading.Thread(target=limited_resource, args=(f"Thread-{i}",)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()Queue (Thread-Safe)
import threading
import queue
import time
q = queue.Queue()
def producer():
for i in range(5):
q.put(i)
print(f"Produced {i}")
time.sleep(0.1)
q.put(None) # Sentinel
def consumer():
while True:
item = q.get()
if item is None:
break
print(f"Consumed {item}")
q.task_done()
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def task(n):
time.sleep(n)
return n * n
# Submit tasks
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(task, i) for i in range(5)]
# Get results as they complete
for future in as_completed(futures):
print(f"Result: {future.result()}")
# Map pattern
with ThreadPoolExecutor(max_workers=4) as executor:
results = executor.map(task, range(5))
print(list(results))Thread-Local Data
import threading
# Each thread gets its own copy
local_data = threading.local()
def worker(value):
local_data.x = value
print(f"Thread {threading.current_thread().name}: x = {local_data.x}")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()Timer
import threading
def delayed_task():
print("Timer fired!")
# Execute after 3 seconds
timer = threading.Timer(3.0, delayed_task)
timer.start()
# Can cancel before it fires
# timer.cancel()Barrier
import threading
import time
barrier = threading.Barrier(3)
def worker(name):
print(f"{name} waiting at barrier")
barrier.wait()
print(f"{name} passed barrier")
threads = [threading.Thread(target=worker, args=(f"Thread-{i}",)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()Common Patterns
import threading
from concurrent.futures import ThreadPoolExecutor
# Thread-safe singleton
class Singleton:
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
# Producer-consumer with queue
def producer_consumer_pattern():
import queue
q = queue.Queue(maxsize=10)
stop_event = threading.Event()
def producer():
for i in range(100):
if stop_event.is_set():
break
q.put(i)
def consumer():
while not stop_event.is_set() or not q.empty():
try:
item = q.get(timeout=0.1)
# Process item
q.task_done()
except queue.Empty:
continue
# Parallel map
def parallel_map(func, items, workers=4):
with ThreadPoolExecutor(max_workers=workers) as executor:
return list(executor.map(func, items))GIL Considerations
import threading
import time
# CPU-bound: GIL limits parallelism
def cpu_bound():
total = 0
for i in range(10_000_000):
total += i
return total
# IO-bound: GIL released during IO, threads help
def io_bound():
time.sleep(1)
return "done"
# For CPU-bound, use multiprocessing instead
from multiprocessing import Pool
def cpu_parallel(func, items):
with Pool() as pool:
return pool.map(func, items)Safe Shutdown
import threading
import time
class Worker:
def __init__(self):
self.running = False
self.thread = None
def start(self):
self.running = True
self.thread = threading.Thread(target=self._run)
self.thread.start()
def stop(self):
self.running = False
if self.thread:
self.thread.join(timeout=5)
def _run(self):
while self.running:
# Do work
time.sleep(0.1)
worker = Worker()
worker.start()
time.sleep(2)
worker.stop()Best Practices
# Use context managers for locks
with lock:
# Critical section
pass
# Prefer ThreadPoolExecutor over manual threads
with ThreadPoolExecutor() as executor:
results = executor.map(func, items)
# Use Queue for thread communication
q = queue.Queue()
# Set timeouts to avoid deadlocks
lock.acquire(timeout=5)
q.get(timeout=1)
# Use daemon=True for background threads
threading.Thread(target=bg_task, daemon=True)Threading is good for I/O-bound tasks. For CPU-bound work, use multiprocessing. For async I/O, consider asyncio.
React to this post: