Threading lets you run code concurrently. Here's how to use it safely in Python.

Basic Threads

import threading
import time
 
def worker(name, delay):
    print(f"{name} starting")
    time.sleep(delay)
    print(f"{name} done")
 
# Create and start threads
t1 = threading.Thread(target=worker, args=("Thread-1", 2))
t2 = threading.Thread(target=worker, args=("Thread-2", 1))
 
t1.start()
t2.start()
 
# Wait for completion
t1.join()
t2.join()
 
print("All done")

Thread with Return Value

import threading
 
class ResultThread(threading.Thread):
    def __init__(self, func, args=()):
        super().__init__()
        self.func = func
        self.args = args
        self.result = None
    
    def run(self):
        self.result = self.func(*self.args)
 
def compute(x):
    return x * x
 
t = ResultThread(compute, (5,))
t.start()
t.join()
print(t.result)  # 25

Daemon Threads

import threading
import time
 
def background_task():
    while True:
        print("Background running...")
        time.sleep(1)
 
# Daemon threads are killed when main thread exits
t = threading.Thread(target=background_task, daemon=True)
t.start()
 
time.sleep(3)
print("Main thread exiting")
# Daemon thread is automatically terminated

Locks

import threading
 
counter = 0
lock = threading.Lock()
 
def increment():
    global counter
    for _ in range(100000):
        with lock:  # Thread-safe
            counter += 1
 
threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()
 
print(counter)  # 500000 (correct)

RLock (Reentrant Lock)

import threading
 
rlock = threading.RLock()
 
def outer():
    with rlock:
        print("Outer acquired")
        inner()  # Can acquire same lock again
 
def inner():
    with rlock:
        print("Inner acquired")
 
# Regular Lock would deadlock here
outer()

Condition Variables

import threading
 
queue = []
condition = threading.Condition()
 
def producer():
    for i in range(5):
        with condition:
            queue.append(i)
            print(f"Produced {i}")
            condition.notify()
 
def consumer():
    while True:
        with condition:
            while not queue:
                condition.wait()
            item = queue.pop(0)
            print(f"Consumed {item}")
            if item == 4:
                break
 
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t2.start()
t1.start()
t1.join()
t2.join()

Events

import threading
import time
 
event = threading.Event()
 
def waiter():
    print("Waiting for event...")
    event.wait()
    print("Event received!")
 
def setter():
    time.sleep(2)
    print("Setting event")
    event.set()
 
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)
t1.start()
t2.start()
t1.join()
t2.join()
 
# Reset event
event.clear()

Semaphore

import threading
import time
 
# Limit concurrent access
semaphore = threading.Semaphore(3)
 
def limited_resource(name):
    with semaphore:
        print(f"{name} acquired")
        time.sleep(1)
        print(f"{name} released")
 
threads = [threading.Thread(target=limited_resource, args=(f"Thread-{i}",)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Queue (Thread-Safe)

import threading
import queue
import time
 
q = queue.Queue()
 
def producer():
    for i in range(5):
        q.put(i)
        print(f"Produced {i}")
        time.sleep(0.1)
    q.put(None)  # Sentinel
 
def consumer():
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Consumed {item}")
        q.task_done()
 
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()

ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
 
def task(n):
    time.sleep(n)
    return n * n
 
# Submit tasks
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(task, i) for i in range(5)]
    
    # Get results as they complete
    for future in as_completed(futures):
        print(f"Result: {future.result()}")
 
# Map pattern
with ThreadPoolExecutor(max_workers=4) as executor:
    results = executor.map(task, range(5))
    print(list(results))

Thread-Local Data

import threading
 
# Each thread gets its own copy
local_data = threading.local()
 
def worker(value):
    local_data.x = value
    print(f"Thread {threading.current_thread().name}: x = {local_data.x}")
 
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Timer

import threading
 
def delayed_task():
    print("Timer fired!")
 
# Execute after 3 seconds
timer = threading.Timer(3.0, delayed_task)
timer.start()
 
# Can cancel before it fires
# timer.cancel()

Barrier

import threading
import time
 
barrier = threading.Barrier(3)
 
def worker(name):
    print(f"{name} waiting at barrier")
    barrier.wait()
    print(f"{name} passed barrier")
 
threads = [threading.Thread(target=worker, args=(f"Thread-{i}",)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

Common Patterns

import threading
from concurrent.futures import ThreadPoolExecutor
 
# Thread-safe singleton
class Singleton:
    _instance = None
    _lock = threading.Lock()
    
    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
        return cls._instance
 
# Producer-consumer with queue
def producer_consumer_pattern():
    import queue
    
    q = queue.Queue(maxsize=10)
    stop_event = threading.Event()
    
    def producer():
        for i in range(100):
            if stop_event.is_set():
                break
            q.put(i)
    
    def consumer():
        while not stop_event.is_set() or not q.empty():
            try:
                item = q.get(timeout=0.1)
                # Process item
                q.task_done()
            except queue.Empty:
                continue
 
# Parallel map
def parallel_map(func, items, workers=4):
    with ThreadPoolExecutor(max_workers=workers) as executor:
        return list(executor.map(func, items))

GIL Considerations

import threading
import time
 
# CPU-bound: GIL limits parallelism
def cpu_bound():
    total = 0
    for i in range(10_000_000):
        total += i
    return total
 
# IO-bound: GIL released during IO, threads help
def io_bound():
    time.sleep(1)
    return "done"
 
# For CPU-bound, use multiprocessing instead
from multiprocessing import Pool
 
def cpu_parallel(func, items):
    with Pool() as pool:
        return pool.map(func, items)

Safe Shutdown

import threading
import time
 
class Worker:
    def __init__(self):
        self.running = False
        self.thread = None
    
    def start(self):
        self.running = True
        self.thread = threading.Thread(target=self._run)
        self.thread.start()
    
    def stop(self):
        self.running = False
        if self.thread:
            self.thread.join(timeout=5)
    
    def _run(self):
        while self.running:
            # Do work
            time.sleep(0.1)
 
worker = Worker()
worker.start()
time.sleep(2)
worker.stop()

Best Practices

# Use context managers for locks
with lock:
    # Critical section
    pass
 
# Prefer ThreadPoolExecutor over manual threads
with ThreadPoolExecutor() as executor:
    results = executor.map(func, items)
 
# Use Queue for thread communication
q = queue.Queue()
 
# Set timeouts to avoid deadlocks
lock.acquire(timeout=5)
q.get(timeout=1)
 
# Use daemon=True for background threads
threading.Thread(target=bg_task, daemon=True)

Threading is good for I/O-bound tasks. For CPU-bound work, use multiprocessing. For async I/O, consider asyncio.

React to this post: