Multithreading

Multithreading dient dazu, zwei Aufgaben gleichzeitig abarbeiten zu lassen. Dies spart je nach Anwendungsfall eine Menge Zeit.Für diesen Teil des Tutorials solltest dich auf jeden Fall mit dem Kapitel Zeitmessung beschäftigt haben

1Python Programmierkurs
2Python: Methoden
3Kontrollstrukturen
4Strings in Python
5Container
6Objekte in Python
7Module
8Exceptions in Python
9Typkonvertierung
10Python und Dateien
11Datum und Zeit mit Python verarbeiten
12Multithreading
13Netzwerk in Python
14Logging in Python
15GPIO
16Automatische Tests
17Datenbanken mit Python
18Python: Generatoren und List Comprehension
19Python: Webseiten mit Flask
20Python virtuelle Umgebungen
21Interrupts & Signale
22NumPy
23Matplotlib
24match
25Reguläre Ausdrücke
26lambda Funktionen
27logging.config
28Decorators

Historisches

Der Prozessor meines C64 war mit 1 MHz Taktfrequenz nicht schnell genug, um mehr als ein Programm gleichzeitig abarbeiten zu können. Erst mit dem Amiga, der einen siebenfach höheren CPU-Takt und ein entsprechend ausgelegtes Betriebsystem (AmigaOS) hatte, lernte ich MultiTaskingkennen, bei dem mehrere Programme scheinbar simultan liefen (genaugenommen wurde die Prozessorzeit der Reihe nach auf die verschiedenen Tasks geschickt verteilt.

MultiThtreading Auf dem Raspberry Pi

Du erinnerst dich, der Raspberry Pi besitzt einen ARM Prozessor mit vier Kernen, die mit 1500 MHz getaktet sind, jeder dieser Kerne kann im Prinzip ein eigenständiges Programm abarbeiten. Allerdings ist er auch schnell genug, u.U. mehrere Threads auf einem Kern auszuführen, das entscheidet das Betriebssystem.

Das threading Modul

Python stellt uns mit dem threading Modul ein Werkzeug zur parallelen Programmierung zur Verfügung.

Im letzten Kapitel hatten wir die Methoden calc_square() und calc_cube() benutzt, um deren Laufzeit zu messen. Jetzt zeig ich dir, wie diese meiden Berechnung parallel nebeneinander.

from timeit import default_timer as timer
import time
import threading

def calc_square(numbers, verbose=False):
  for n in range(1,numbers):
    q= n*n
    if verbose:
      print(f'\n{n} ^ 2 = {q}')
    time.sleep(0.1)

def calc_cube(numbers,verbose=False):
  for n in range(1,numbers):
    k = n*n*n
    if verbose:
      print(f'\n{n} ^ 3 = {k}')
    time.sleep(0.1)

start = timer()

thread_square = threading.Thread(target=calc_square, args=(100,True))
thread_cube = threading.Thread(target=calc_cube, args=(100,True))

thread_cube.start()
thread_square.start()

thread_cube.join()
thread_square.join()
ende = timer()
differenz_mit_print = ende - start
print(f'Zeit mit print():{differenz_mit_print}s')

start = timer()

thread_square = threading.Thread(target=calc_square, args=(100,False))
thread_cube = threading.Thread(target=calc_cube, args=(100,False))

thread_cube.start()
thread_square.start()

thread_cube.join()
thread_square.join()

ende = timer()
differenz_ohne_print = ende - start
print(f'Zeit ohne print():{differenz_ohne_print}s')

Mit thread_square = threading.Thread(target=calc_square, args=(100,True))erzeugst du ein neues Thread Objekt, in der die Methode als target Parameter mitgegeben wird, die Parameter für calc_square() werden als Tupel im args Parameter übergeben. Für calc_cube() machen wir das gleiche.
Danach werden beide Threads mit der start() Methode gestartet. Damit der main() Thread weiß, dass sie mit ihrer Arbeit fertig sind, führt er die parallelen Threads mittels join() wieder mit sich zusammen.

Ich denke, grafisch ist das besser zu verstehen.

MultiThreading  grafisch dargestellt.
Olli Graf - raspithek.de
GraphCreative Commons Attribution-NonCommercial-ShareAlike 4.0 International License . loading=
MultiThreading grafisch dargestellt.

Du siehst hier, wie die beiden Threads zunächst mit start() vom main() Thread abgespalten und nach deren Beendigung durch join() wieder zusammengeführt werden.

Probleme beim Multithreading

Bei mehreren parallelen Threads kann es zu Problemen kommen, wenn sie gleichzeitig auf die selbe Resource zugreifen wollen. Stell dir vor, zwei Threads wollen gleichzeitig in die selbe globale Variable hineinschreiben. In diesem Fall ist völlig unklar, wer dann seine Daten korrekt schreiben konnte. In der Informatik nennt man sowas Race Condition, weil mehrere Threads eine Art Wettrennen um den Endzustand austragen. Schau dir mal das Programm race_condition.py an:

import threading
import time
import random

gesamtwert = 0
message = ''

def calculate_sum(thread_nummer):
  global gesamtwert
  global message

  for i in range(100 + thread_nummer):
    zwischen = gesamtwert
#Zufällige Wartezeit zwischen 0,1s und 0,5s
    wartezeit = random.randint(1,5+thread_nummer)
    time.sleep(0.1 * wartezeit)

    zwischen += 1
    gesamtwert = zwischen
  message = f'Thread {thread_nummer} fertig'
  print(f'Thread + {thread_nummer} abgearbeitet.')

# drei Threads starten, um die Summe der Zahlen zu berechnen
gesamtwert = 0
t1 = threading.Thread(target=calculate_sum, args=(1,))
t2 = threading.Thread(target=calculate_sum, args=(2,))
t3 = threading.Thread(target=calculate_sum, args=(3,))
t1.start()
t2.start()
t3.start()
t1.join()
t3.join()
t2.join()

print(f'gesamtwert={gesamtwert}')
print(f'message={message}')

Hier wird die Methode calculate_sum in drei Threads gestartet. Die Methode zählt einen Zwischenwert 101 bis 103 mal hoch (je nach übergebener thread_nummer) und schreibt das Ergebnis in die globale Variable gesamtwert hinein. Das global Statement sagt Python, dass es keine neue lokale Variable in der Methode anlegen soll, sondern die programmweite benutzt werden muss! Ich habe noch eine zufällige Wartezeit zwischen 0,1 und 0,5 Sekunden pro Schleifendurchlauf eingebaut, um den Threads unterschiedliche Laufzeiten zu geben.

Ich habe mit dem Programm mal zwei Durchläufe durchgeführt. Beim ersten wird thread_nummer von der maximalen Wartezeit abgezogen, beim zweiten dazu addiert.

Ich habe mit dem Programm mal zwei Durchläufe durchgeführt. Beim ersten wird thread_nummer von der maximalen Wartezeit abgezogen, beim zweiten dazu addiert. Dies führt dazu, das einmal Thread 1 die längste Ausführzeit hat, einmal ist es Thread 3

Ausführung von race_condition.py Einmal gewinnt Thread 1, das andere Mal Thread 3
Olli Graf - raspithek.de
race_conditionCreative Commons Attribution-NonCommercial-ShareAlike 4.0 International License . loading=
Ausführung von race_condition.py Einmal gewinnt Thread 1, das andere Mal Thread 3

Du erkennst hier, dass der Thread, der am längsten für die Ausführung braucht, die Variablen gesamtwert und message am Ende mit seinen Werten belegt. Das ist kein gewünschtes Verhalten, weil es nicht deterministisch ist. Das Ergebnis der anderen beiden Threads ist uns nicht bekannt.

Locks

Um den Zugriff auf Resourcen zu koordinieren, stellt uns das threading Modul Locks zur Verfügung. Ein Lock ist eine Art Bahnsignal, das verhindert, dass ein Zug in einen Gleisabschnitteinfährt, solange ein anderer ihn noch belegt.

Zunächst du einen Lock an.

lock = threading.Lock()

Jeder Zugriff auf gesamtwert musst du jetzt mit dem Lock absichern.

import threading
import time
import random

gesamtwert = 0
message = ''
lock = threading.Lock()


def calculate_sum(thread_nummer,lock):
  global gesamtwert
  global message

  for i in range(100 + thread_nummer):
    with lock:
      zwischen = gesamtwert
#Zufällige Wartezeit zwischen 0,1s und 0,5s
      wartezeit = random.randint(1,5+thread_nummer)
      time.sleep(0.1 * wartezeit)

      zwischen += 1
   
    with lock:
      gesamtwert = zwischen

  message = f'Thread {thread_nummer} fertig'
  print(f'Thread + {thread_nummer} abgearbeitet.')

# zwei Threads starten, um die Summe der Zahlen zu berechnen
gesamtwert = 0
t1 = threading.Thread(target=calculate_sum, args=(1,lock,))
t2 = threading.Thread(target=calculate_sum, args=(2,lock,))
t3 = threading.Thread(target=calculate_sum, args=(3,lock,))
t1.start()
t2.start()
t3.start()
t1.join()
t3.join()
t2.join()

print(f'gesamtwert={gesamtwert}')
print(f'message={message}')

Immer, wenn ein Thread auf gesamtwert zugreiffen will, muss er durch den lock zunächst warten, bis alle anderen Thrteads den lock freigegeben haben.
Das Ergebnis in gesamtwert sieht dadurch schon besser aus.

usführung von race_condition_lock. Die Threads dürfen nicht mehr gleichzeitig auf gesamtwert zugreifen.
Olli Graf - raspithek.de
race_condition_lockCreative Commons Attribution-NonCommercial-ShareAlike 4.0 International License . loading=
Ausführung von race_condition_lock. Die Threads dürfen nicht mehr gleichzeitig auf gesamtwert zugreifen.

asyncio

Das Modul asyncio ermöglicht das asynchrone Sperren von Zugriffen auf Resourcen. Dadurch wird der Zugriff sehr viel genauer geregelt. Hier also unser kleines Demoprogramm mit asynchronen Locks:

import asyncio
import random
import time

gesamtwert = 0
message = ''


async def calculate_sum(lock, thread_nummer):
  global gesamtwert
  global message

  for i in range(100 + thread_nummer):
    async with lock:
      zwischen = gesamtwert
#Zufällige Wartezeit zwischen 0,1s und 0,5s
      wartezeit = random.randint(1,5+thread_nummer)
      time.sleep(0.1 * wartezeit)

      zwischen += 1

    async with lock:
      gesamtwert = zwischen

  message = f'Thread {thread_nummer} fertig'
  print(f'Thread + {thread_nummer} abgearbeitet.')

async def main():

#async Lock erzeugen
  lock = asyncio.Lock()

# drei Threads starten, um die Summe der Zahlen zu berechnen

  threads = [asyncio.create_task(calculate_sum(lock, _+1)) for _ in range(3)]

#Warten auf alle Threads
  await asyncio.gather(*threads)

  print(f'gesamtwert={gesamtwert}')
  print(f'message={message}')

if __name__ == '__main__':
    asyncio.run(main())

Das enthält ein paar ganz neue Dinge.Zunächst haben wir jetzt eine eigene main() Methode, die mittels async für den asynchronen Zugriff markiert wird. Das Konstrukt

f __name__ == '__main__':
    asyncio.run(main())

sorgt für einen asynchronen Start von main(). Die globale Variable __name__ wird vom Python Interpreter für die gestartete Klasse mit ‚__main__‘ vorbelegt. Ein Programmierer, der deinen Programmcode anschaut weiß dadurch sofort, dass es sich bei dieser Quellcodedatei um die zu startende handelt. In Main erzeugen wir zunächst einen asynchronen Lock

lock = asyncio.Lock()

und starten danach drei Threads mit calculate_sum()

threads = [asyncio.create_task(calculate_sum(lock, _+1)) for _ in range(3)]

Auf den Abschluss aller Threads warten wir mit

await asyncio.gather(*threads)

*threads bedeutet hier einfach, dass threads nicht als Liste übergeben wird, sondern das jedes Listenelement als einzelner Parameter an die Methode übergeben wird (Unpacking)

Das Ergebnis sieht dann so aus.

rgebnis von race_condition_asyncio.py. Das Ergebnis ist jetzt wie erwartet.
Olli Graf - raspithek.de
race_condition_asyncioCreative Commons Attribution-NonCommercial-ShareAlike 4.0 International License . loading=
Ergebnis von race_condition_asyncio.py. Das Ergebnis ist jetzt wie erwartet.

Da sich die Threads jetzt nicht mehr gegenseitig blockieren, ist das Ergebnis, wie wir es uns wünschen (Thread 1 zählt bis 101, Thread 2 bis 102 und Thread 3 bis 103, in Summe 306)

Pools

Mit Pools werden keine einzelnen Threads wie bisher benutzt sondern es pro Instanz ein eigener Python Prozess gestartet, der seinen eigenen Interpreter benutzt. Das Betriebsystem entscheidet dann, wie die Prozesse den CPU-Kernen zugewiesen werden.

Bislang hatten wir verschiedenen Funktionen, jeweils einen eigenen Thread zugewiesen. Wenn wir aber eine Berechnung in verschiedenen Teilbereichen zerlegen, können wir diesen mit einem Pool einen eigenen Prozess zuweisen

#encoding: UTF-8

import datetime
from timeit import default_timer as timer
import multiprocessing
import time
import random
from multiprocessing import Pool

__verbose__ = False

# gibt den String s nur aus, wenn __verbose__ True ist.
def print_verbose(s):
  if __verbose__:
    print(s)

# Arbeitsfunktion läuft in der Zahlenliste von start bis ende
# und summiert die Quadratzahlen in dem Bereich auf.
# Die Methode wird in jedem Prozess mit unterschiedlichen Werten
# für start und ende abgearbeitet.
def calc_sum_square(zahlen, *args,**kwargs):
  print(f'calc_sum_square() {args}')
  time.sleep(1.0)
  start = kwargs['start']
  ende =  kwargs['end']
  threadnr = kwargs['threadnr']
  summe  = 0

  print_verbose(f'von {start} bis {ende}')
  for n in range(start,ende):
    i = zahlen[n]
    print_verbose(f'Berechnung:{threadnr}: {i}, {i*i}')
    summe = summe + i*i

  return summe

# Diese Methode wird initial pro Prozess von der Poolsteuerung aufgerufen und
# steuert die Verteilung der Parameter.
# f ist der Zeiger auf die aufzurufende Methode
# *arg wnthält das Tupel mit den Argumenten
# 
def smap(f, *arg):

# Auflösen des Tupels: args enthält die Liste mit den Zahlen,
# kwarg das Dictionary mit den Parametern für jeden Prozess
  args, kwargs = arg
  
# Aufruf der übergebenen Funktion mit den Parametern
  return f(args, **kwargs)


if __name__ == '__main__':


  start = timer()
 # Liste mit 10 Mio Integerzahlen anlegen.
  zahlen = [i for i in range(10000000)]
  ende = timer()
  print(f'Zahlenliste erzeugt in {ende-start}s')

  split = int(len(zahlen) / 3)
  print_verbose(f'split={split}')
  time.sleep(1.0)

  start = timer()
# Pool mit drei Prozesssen
  with Pool(processes=3) as pool:
# Die Methode pool.starmap() bekommt neben der aufzurufenden Methode smap() eine Liste von Tupeln.
# Jedes Tupel enthält pro Prozess die auszuführende Methode, die Liste mit den Zahlen und ein Dictionary mit
# den Parametern 'threadnr', 'start' und 'end'
# starmap() wartet solange, bis alle drei Teilprozesse beendet sind und gibt dann eine Liste mit den return Werten
# zurück.
    result = pool.starmap(smap, [(calc_sum_square, zahlen, {'threadnr':1,'start':0, 'end':split}), (calc_sum_square, zahlen, {'threadnr':2,'start':split+1,'end':split*2}), (calc_sum_square, zahlen,{'threadnr':3,'start':(split*2)+1,'end':split*3})])
    print(f'Ergebnis: {result}')
  ende = timer()
  print(f'Ausführzeit: {ende-start}s')

Die Arbeitsmethode calc_sum_square() quadriert in ihrem Wertebereich die Zahlen und gibt die Summe der Quadrate an den Pool zurück. Da jeder Prozess nur ein Drittel der Zahlenliste abarbeitet, ist die Gesamtsumme aller Quadratzahlen von zahlen theoretisch dreimal schneller berechnet als es in einem einzelnen Thread passiert. Anmerkung: Wenn du die Komplexität der Zahlenliste von 10 Mio auf 100 Mio erhöhst, wird Python mit einem MemoryError abbrechen.

Die smap Funktion startet die übergebene Funktion f mit f(args, **kwargs). Auf diese Weise wird die Parameterliste aus der starmap() Methode aufgeteilt und an die Arbeitsmethode weitergereicht. Dieses Programm ist sehr rechenintensiv, daher sollte dein Raspberry Pi über eine gute Kühlung verfügen.

htop Screenshot, zu sehen sind die 4 python Prozesse (1 Main Prozess und 3 von uns gestartete Prozesse
Olli Graf - raspithek.de
pool_squareCreative Commons Attribution-NonCommercial-ShareAlike 4.0 International License . loading=
htop Screenshot, zu sehen sind die 4 python Prozesse (1 Main Prozess und 3 von uns gestartete Prozesse
Olli Graf - raspithek.de
pool_square_tempCreative Commons Attribution-NonCommercial-ShareAlike 4.0 International License . loading=
Temperaturentwicklung, der CPU-Temperatur steigt bei Ausführung um satte 3,4°C

Anwendungsgebiete

Multithreading kann Berechnungen in bestimmten Fällen durch Parallelverarbeitung beschleunigen oder aber besser auf Usereingaben reagieren. Wenn du z.B. ein komplexes Video rendern lässt, ist sinnvoll, das GUI-Handling einem separaten Thread zu behandeln, da sonst das Programm bis zum Ende des Redndering-Prozesses nicht auf Eingaben zeitnah reagieren kann. Ebenso könnte während du diese Seite liest ein eigenständiger Thread verlinkte Seiten vorab laden, so dass sie bereits zur Verfügung stehen, bevor du den URL klickst.

Das Thema Multithreading ist etwas für Profis. Laß dich aber nicht von der Menge der Informationen abschrecken, eigene Experimente damit durchzuführen.
Im nächsten Teil wenden wir und mal der Netzwerkprogrammierung zu.

Schreibe einen Kommentar

Creative Commons License
Except where otherwise noted, the content on this site is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.
Olli Graf - raspithek.de
WordPress Cookie Hinweis von Real Cookie Banner