Multithreading dient dazu, zwei Aufgaben gleichzeitig abarbeiten zu lassen. Dies spart je nach Anwendungsfall eine Menge Zeit.Für diesen Teil des Tutorials solltest dich auf jeden Fall mit dem Kapitel Zeitmessung beschäftigt haben
Historisches
Der Prozessor meines C64 war mit 1 MHz Taktfrequenz nicht schnell genug, um mehr als ein Programm gleichzeitig abarbeiten zu können. Erst mit dem Amiga, der einen siebenfach höheren CPU-Takt und ein entsprechend ausgelegtes Betriebsystem (AmigaOS) hatte, lernte ich MultiTaskingkennen, bei dem mehrere Programme scheinbar simultan liefen (genaugenommen wurde die Prozessorzeit der Reihe nach auf die verschiedenen Tasks geschickt verteilt.
MultiThtreading Auf dem Raspberry Pi
Du erinnerst dich, der Raspberry Pi besitzt einen ARM Prozessor mit vier Kernen, die mit 1500 MHz getaktet sind, jeder dieser Kerne kann im Prinzip ein eigenständiges Programm abarbeiten. Allerdings ist er auch schnell genug, u.U. mehrere Threads auf einem Kern auszuführen, das entscheidet das Betriebssystem.
Das threading Modul
Python stellt uns mit dem threading
Modul ein Werkzeug zur parallelen Programmierung zur Verfügung.
Im letzten Kapitel hatten wir die Methoden calc_square() und calc_cube() benutzt, um deren Laufzeit zu messen. Jetzt zeig ich dir, wie diese meiden Berechnung parallel nebeneinander.
from timeit import default_timer as timer
import time
import threading
def calc_square(numbers, verbose=False):
for n in range(1,numbers):
q= n*n
if verbose:
print(f'\n{n} ^ 2 = {q}')
time.sleep(0.1)
def calc_cube(numbers,verbose=False):
for n in range(1,numbers):
k = n*n*n
if verbose:
print(f'\n{n} ^ 3 = {k}')
time.sleep(0.1)
start = timer()
thread_square = threading.Thread(target=calc_square, args=(100,True))
thread_cube = threading.Thread(target=calc_cube, args=(100,True))
thread_cube.start()
thread_square.start()
thread_cube.join()
thread_square.join()
ende = timer()
differenz_mit_print = ende - start
print(f'Zeit mit print():{differenz_mit_print}s')
start = timer()
thread_square = threading.Thread(target=calc_square, args=(100,False))
thread_cube = threading.Thread(target=calc_cube, args=(100,False))
thread_cube.start()
thread_square.start()
thread_cube.join()
thread_square.join()
ende = timer()
differenz_ohne_print = ende - start
print(f'Zeit ohne print():{differenz_ohne_print}s')
Mit thread_square = threading.Thread(target=calc_square, args=(100,True))
erzeugst du ein neues Thread Objekt, in der die Methode als target
Parameter mitgegeben wird, die Parameter für calc_square()
werden als Tupel im args
Parameter übergeben. Für calc_cube()
machen wir das gleiche.
Danach werden beide Threads mit der start()
Methode gestartet. Damit der main()
Thread weiß, dass sie mit ihrer Arbeit fertig sind, führt er die parallelen Threads mittels join()
wieder mit sich zusammen.
Ich denke, grafisch ist das besser zu verstehen.
Du siehst hier, wie die beiden Threads zunächst mit start()
vom main()
Thread abgespalten und nach deren Beendigung durch join()
wieder zusammengeführt werden.
Probleme beim Multithreading
Bei mehreren parallelen Threads kann es zu Problemen kommen, wenn sie gleichzeitig auf die selbe Resource zugreifen wollen. Stell dir vor, zwei Threads wollen gleichzeitig in die selbe globale Variable hineinschreiben. In diesem Fall ist völlig unklar, wer dann seine Daten korrekt schreiben konnte. In der Informatik nennt man sowas Race Condition, weil mehrere Threads eine Art Wettrennen um den Endzustand austragen. Schau dir mal das Programm race_condition.py an:
import threading
import time
import random
gesamtwert = 0
message = ''
def calculate_sum(thread_nummer):
global gesamtwert
global message
for i in range(100 + thread_nummer):
zwischen = gesamtwert
#Zufällige Wartezeit zwischen 0,1s und 0,5s
wartezeit = random.randint(1,5+thread_nummer)
time.sleep(0.1 * wartezeit)
zwischen += 1
gesamtwert = zwischen
message = f'Thread {thread_nummer} fertig'
print(f'Thread + {thread_nummer} abgearbeitet.')
# drei Threads starten, um die Summe der Zahlen zu berechnen
gesamtwert = 0
t1 = threading.Thread(target=calculate_sum, args=(1,))
t2 = threading.Thread(target=calculate_sum, args=(2,))
t3 = threading.Thread(target=calculate_sum, args=(3,))
t1.start()
t2.start()
t3.start()
t1.join()
t3.join()
t2.join()
print(f'gesamtwert={gesamtwert}')
print(f'message={message}')
Hier wird die Methode calculate_sum
in drei Threads gestartet. Die Methode zählt einen Zwischenwert 101 bis 103 mal hoch (je nach übergebener thread_nummer
) und schreibt das Ergebnis in die globale Variable gesamtwert
hinein. Das global
Statement sagt Python, dass es keine neue lokale Variable in der Methode anlegen soll, sondern die programmweite benutzt werden muss! Ich habe noch eine zufällige Wartezeit zwischen 0,1 und 0,5 Sekunden pro Schleifendurchlauf eingebaut, um den Threads unterschiedliche Laufzeiten zu geben.
Ich habe mit dem Programm mal zwei Durchläufe durchgeführt. Beim ersten wird thread_nummer
von der maximalen Wartezeit abgezogen, beim zweiten dazu addiert.
Ich habe mit dem Programm mal zwei Durchläufe durchgeführt. Beim ersten wird thread_nummer
von der maximalen Wartezeit abgezogen, beim zweiten dazu addiert. Dies führt dazu, das einmal Thread 1 die längste Ausführzeit hat, einmal ist es Thread 3
Du erkennst hier, dass der Thread, der am längsten für die Ausführung braucht, die Variablen gesamtwert
und message
am Ende mit seinen Werten belegt. Das ist kein gewünschtes Verhalten, weil es nicht deterministisch ist. Das Ergebnis der anderen beiden Threads ist uns nicht bekannt.
Locks
Um den Zugriff auf Resourcen zu koordinieren, stellt uns das threading Modul Locks zur Verfügung. Ein Lock ist eine Art Bahnsignal, das verhindert, dass ein Zug in einen Gleisabschnitteinfährt, solange ein anderer ihn noch belegt.
Zunächst du einen Lock an.
lock = threading.Lock()
Jeder Zugriff auf gesamtwert
musst du jetzt mit dem Lock absichern.
import threading
import time
import random
gesamtwert = 0
message = ''
lock = threading.Lock()
def calculate_sum(thread_nummer,lock):
global gesamtwert
global message
for i in range(100 + thread_nummer):
with lock:
zwischen = gesamtwert
#Zufällige Wartezeit zwischen 0,1s und 0,5s
wartezeit = random.randint(1,5+thread_nummer)
time.sleep(0.1 * wartezeit)
zwischen += 1
with lock:
gesamtwert = zwischen
message = f'Thread {thread_nummer} fertig'
print(f'Thread + {thread_nummer} abgearbeitet.')
# zwei Threads starten, um die Summe der Zahlen zu berechnen
gesamtwert = 0
t1 = threading.Thread(target=calculate_sum, args=(1,lock,))
t2 = threading.Thread(target=calculate_sum, args=(2,lock,))
t3 = threading.Thread(target=calculate_sum, args=(3,lock,))
t1.start()
t2.start()
t3.start()
t1.join()
t3.join()
t2.join()
print(f'gesamtwert={gesamtwert}')
print(f'message={message}')
Immer, wenn ein Thread auf gesamtwert
zugreiffen will, muss er durch den lock
zunächst warten, bis alle anderen Thrteads den lock
freigegeben haben.
Das Ergebnis in gesamtwert sieht dadurch schon besser aus.
asyncio
Das Modul asyncio ermöglicht das asynchrone Sperren von Zugriffen auf Resourcen. Dadurch wird der Zugriff sehr viel genauer geregelt. Hier also unser kleines Demoprogramm mit asynchronen Locks:
import asyncio
import random
import time
gesamtwert = 0
message = ''
async def calculate_sum(lock, thread_nummer):
global gesamtwert
global message
for i in range(100 + thread_nummer):
async with lock:
zwischen = gesamtwert
#Zufällige Wartezeit zwischen 0,1s und 0,5s
wartezeit = random.randint(1,5+thread_nummer)
time.sleep(0.1 * wartezeit)
zwischen += 1
async with lock:
gesamtwert = zwischen
message = f'Thread {thread_nummer} fertig'
print(f'Thread + {thread_nummer} abgearbeitet.')
async def main():
#async Lock erzeugen
lock = asyncio.Lock()
# drei Threads starten, um die Summe der Zahlen zu berechnen
threads = [asyncio.create_task(calculate_sum(lock, _+1)) for _ in range(3)]
#Warten auf alle Threads
await asyncio.gather(*threads)
print(f'gesamtwert={gesamtwert}')
print(f'message={message}')
if __name__ == '__main__':
asyncio.run(main())
Das enthält ein paar ganz neue Dinge.Zunächst haben wir jetzt eine eigene main()
Methode, die mittels async
für den asynchronen Zugriff markiert wird. Das Konstrukt
f __name__ == '__main__':
asyncio.run(main())
sorgt für einen asynchronen Start von main()
. Die globale Variable __name__
wird vom Python Interpreter für die gestartete Klasse mit ‚__main__‘ vorbelegt. Ein Programmierer, der deinen Programmcode anschaut weiß dadurch sofort, dass es sich bei dieser Quellcodedatei um die zu startende handelt. In Main erzeugen wir zunächst einen asynchronen Lock
lock = asyncio.Lock()
und starten danach drei Threads mit calculate_sum()
threads = [asyncio.create_task(calculate_sum(lock, _+1)) for _ in range(3)]
Auf den Abschluss aller Threads warten wir mit
await asyncio.gather(*threads)
*threads
bedeutet hier einfach, dass threads
nicht als Liste übergeben wird, sondern das jedes Listenelement als einzelner Parameter an die Methode übergeben wird (Unpacking)
Das Ergebnis sieht dann so aus.
Da sich die Threads jetzt nicht mehr gegenseitig blockieren, ist das Ergebnis, wie wir es uns wünschen (Thread 1 zählt bis 101, Thread 2 bis 102 und Thread 3 bis 103, in Summe 306)
Pools
Mit Pools werden keine einzelnen Threads wie bisher benutzt sondern es pro Instanz ein eigener Python Prozess gestartet, der seinen eigenen Interpreter benutzt. Das Betriebsystem entscheidet dann, wie die Prozesse den CPU-Kernen zugewiesen werden.
Bislang hatten wir verschiedenen Funktionen, jeweils einen eigenen Thread zugewiesen. Wenn wir aber eine Berechnung in verschiedenen Teilbereichen zerlegen, können wir diesen mit einem Pool einen eigenen Prozess zuweisen
#encoding: UTF-8
import datetime
from timeit import default_timer as timer
import multiprocessing
import time
import random
from multiprocessing import Pool
__verbose__ = False
# gibt den String s nur aus, wenn __verbose__ True ist.
def print_verbose(s):
if __verbose__:
print(s)
# Arbeitsfunktion läuft in der Zahlenliste von start bis ende
# und summiert die Quadratzahlen in dem Bereich auf.
# Die Methode wird in jedem Prozess mit unterschiedlichen Werten
# für start und ende abgearbeitet.
def calc_sum_square(zahlen, *args,**kwargs):
print(f'calc_sum_square() {args}')
time.sleep(1.0)
start = kwargs['start']
ende = kwargs['end']
threadnr = kwargs['threadnr']
summe = 0
print_verbose(f'von {start} bis {ende}')
for n in range(start,ende):
i = zahlen[n]
print_verbose(f'Berechnung:{threadnr}: {i}, {i*i}')
summe = summe + i*i
return summe
# Diese Methode wird initial pro Prozess von der Poolsteuerung aufgerufen und
# steuert die Verteilung der Parameter.
# f ist der Zeiger auf die aufzurufende Methode
# *arg wnthält das Tupel mit den Argumenten
#
def smap(f, *arg):
# Auflösen des Tupels: args enthält die Liste mit den Zahlen,
# kwarg das Dictionary mit den Parametern für jeden Prozess
args, kwargs = arg
# Aufruf der übergebenen Funktion mit den Parametern
return f(args, **kwargs)
if __name__ == '__main__':
start = timer()
# Liste mit 10 Mio Integerzahlen anlegen.
zahlen = [i for i in range(10000000)]
ende = timer()
print(f'Zahlenliste erzeugt in {ende-start}s')
split = int(len(zahlen) / 3)
print_verbose(f'split={split}')
time.sleep(1.0)
start = timer()
# Pool mit drei Prozesssen
with Pool(processes=3) as pool:
# Die Methode pool.starmap() bekommt neben der aufzurufenden Methode smap() eine Liste von Tupeln.
# Jedes Tupel enthält pro Prozess die auszuführende Methode, die Liste mit den Zahlen und ein Dictionary mit
# den Parametern 'threadnr', 'start' und 'end'
# starmap() wartet solange, bis alle drei Teilprozesse beendet sind und gibt dann eine Liste mit den return Werten
# zurück.
result = pool.starmap(smap, [(calc_sum_square, zahlen, {'threadnr':1,'start':0, 'end':split}), (calc_sum_square, zahlen, {'threadnr':2,'start':split+1,'end':split*2}), (calc_sum_square, zahlen,{'threadnr':3,'start':(split*2)+1,'end':split*3})])
print(f'Ergebnis: {result}')
ende = timer()
print(f'Ausführzeit: {ende-start}s')
Die Arbeitsmethode calc_sum_square()
quadriert in ihrem Wertebereich die Zahlen und gibt die Summe der Quadrate an den Pool zurück. Da jeder Prozess nur ein Drittel der Zahlenliste abarbeitet, ist die Gesamtsumme aller Quadratzahlen von zahlen
theoretisch dreimal schneller berechnet als es in einem einzelnen Thread passiert. Anmerkung: Wenn du die Komplexität der Zahlenliste von 10 Mio auf 100 Mio erhöhst, wird Python mit einem MemoryError
abbrechen.
Die smap Funktion startet die übergebene Funktion f
mit f(args, **kwargs). Auf diese Weise wird die Parameterliste aus der starmap() Methode aufgeteilt und an die Arbeitsmethode weitergereicht. Dieses Programm ist sehr rechenintensiv, daher sollte dein Raspberry Pi über eine gute Kühlung verfügen.
Anwendungsgebiete
Multithreading kann Berechnungen in bestimmten Fällen durch Parallelverarbeitung beschleunigen oder aber besser auf Usereingaben reagieren. Wenn du z.B. ein komplexes Video rendern lässt, ist sinnvoll, das GUI-Handling einem separaten Thread zu behandeln, da sonst das Programm bis zum Ende des Redndering-Prozesses nicht auf Eingaben zeitnah reagieren kann. Ebenso könnte während du diese Seite liest ein eigenständiger Thread verlinkte Seiten vorab laden, so dass sie bereits zur Verfügung stehen, bevor du den URL klickst.
Das Thema Multithreading ist etwas für Profis. Laß dich aber nicht von der Menge der Informationen abschrecken, eigene Experimente damit durchzuführen.
Im nächsten Teil wenden wir und mal der Netzwerkprogrammierung zu.