Cum să utilizați filetare în Python?

Am încercat să înțeleg de filetare în Python. Am'am uitat la documentare și exemple, dar destul de sincer, de multe exemple sunt prea sofisticate și am'm având probleme în a le înțelege.

Cum vă arată în mod clar sarcinile fiind împărțite pentru multi-threading?

Comentarii la întrebare (6)

Când această întrebare a fost întrebat în 2010, a existat o simplificare reală în modul de a face simplu multithreading cu python cu harta și biliard.

Codul de mai jos vine de la un articol/post pe blog că ar trebui să verificați cu siguranta (nici o afiliere) - Paralelism într-o singură linie: Un Model Mai bun pentru o zi la Zi de Filetat Sarcini. Am'll rezuma mai jos - se termină prin a fi doar câteva linii de cod:

from multiprocessing.dummy import Pool as ThreadPool 
pool = ThreadPool(4) 
results = pool.map(my_function, my_array)

Care este multithreaded versiune de:

results = []
for item in my_array:
    results.append(my_function(item))

Descriere

Harta este o funcție, și cheia cu ușurință injectarea de paralelism în cod Python. Pentru cei nefamiliarizati, harta este ceva ridicat de limbi funcționale, cum ar fi Lisp. Este o funcție care descrie o altă funcție pe o secvență.

Harta se ocupă de repetare de-a lungul secvență pentru noi, se aplică funcția, și stochează toate rezultatele într-o listă la îndemână de la sfârșitul.


Implementarea

Paralel versiuni de funcția hartă sunt furnizate de două biblioteci:multiprocesare, și, de asemenea, puțin cunoscute, dar la fel de fantastic pas copil:multiprocesare.dummy.

multiprocesare.dummy este exact la fel ca multiprocesare modulului, dar foloseste fire în loc (o distincție importantă - utilizați mai multe procese pentru CPU-intensive sarcini; fire (și în timpul) IO):

multiprocesare.dummy reproduce API de multiprocesare, dar nu mai mult decât un înveliș în jurul valorii de filetare module.

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool 

urls = [
  'http://www.python.org', 
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# make the Pool of workers
pool = ThreadPool(4) 

# open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# close the pool and wait for the work to finish 
pool.close() 
pool.join() 

Și calendarul rezultate:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

Trecerea de mai multe argumente (funcționează ca aceasta numai în Python 3.3 și mai târziu):

Pentru a trece mai multe tablouri:

results = pool.starmap(function, zip(list_a, list_b))

sau să treacă o constantă și o matrice:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

Dacă utilizați o versiune anterioară de Python, puteți trece mai multe argumente prin această soluție.

(Vă mulțumim pentru user136036 pentru comentariu util)

Comentarii (30)

Aici's un exemplu simplu: ai nevoie pentru a încerca o alternativă câteva Url-uri și de a reveni la conținutul de primul pentru a răspunde.

import Queue
import threading
import urllib2

# called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

Acesta este un caz în filetat este utilizat ca un simplu optimizare: fiecare subthread este în așteptare pentru un URL pentru a rezolva și de a răspunde, în scopul de a pune conținutul său pe coada; fiecare subiect este un daemon (a castigat't menține procesul dacă firul principal se termină ... asta's mult mai frecvente decât nu); firul principal începe subthreads, nu o "a lua" la coadă să aștepte până când unul dintre ei a făcut-o "pune", apoi emite rezultatele și se termină (care ia în jos orice subthreads care ar mai putea fi difuzate, deoarece acestea're fire daemon).

Utilizarea corectă a firelor în Python este invariabil legat de operațiunile de I/O (din CPython nu't de a folosi mai multe nuclee pentru a rula CPU-bound sarcini oricum, singurul motiv pentru filetare nu este blocarea procesului în timp ce nu's-o așteptați de ceva I/O). Cozile sunt aproape invariabil, cel mai bun mod pentru ferma de muncă la fire și/sau să colecteze munca's rezultatele, de altfel, și-au're intrinsec sigură pentru fire deci se salva de la griji despre încuietori, condiții, evenimente, semafoare și alte inter-thread coordonare/comunicare concepte.

Comentarii (14)

NOTĂ: efectiv Pentru paralelizare în Python, ar trebui să utilizați multiprocesare module pentru a plăti mai multe procese care se execută în paralel (ca urmare a interpret global de blocare, Python fire oferi intercalarea dar sunt, de fapt, executate în serie, nu in paralel, si sunt utile numai atunci când intercalarea operațiuni I/O).

Cu toate acestea, dacă sunteți doar în căutarea pentru intercalarea (sau se fac operațiuni de I/O care pot fi paralelizate în ciuda globale interpret de blocare), apoi filetare modulul este locul pentru a începe. Ca un exemplu simplu, să's ia în considerare problema de însumând o gamă largă prin însumarea subranges în paralel:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i

thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()  
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

Rețineți că cele de mai sus este un foarte prost exemplu, ca nu-i absolut nici I/O și vor fi executate în serie, deși intercalat (cu a adăugat aeriene din context switching) în CPython din cauza globale interpret de blocare.

Comentarii (8)

Ca și alții menționat, CPython pot folosi fire doar pentru am\O așteaptă din cauza GIL. Dacă doriți să beneficiați de mai multe nuclee de CPU-bound sarcinile, utilizați multiprocesare:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()
Comentarii (5)

Doar o notă, Coada nu este necesară pentru filetare.

Acesta este cel mai simplu exemplu am putea imagina că arată 10 procese care rulează în același timp.

import threading
from random import randint
from time import sleep

def print_number(number):
    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):
    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"
Comentarii (5)

Răspunsul la Alex Martelli m-a ajutat, cu toate acestea, aici este o versiune modificată care am crezut eu că e mai util (cel putin pentru mine).

Actualizat: funcționează în ambele python2 și python3

try:
    # for python3
    import queue
    from urllib.request import urlopen
except:
    # for python2 
    import Queue as queue
    from urllib2 import urlopen

import threading

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

#load up a queue with your data, this will handle locking
q = queue.Queue()
for url in worker_data:
    q.put(url)

#define a worker function
def worker(url_queue):
    queue_full = True
    while queue_full:
        try:
            #get your data off the queue, and do some work
            url = url_queue.get(False)
            data = urlopen(url).read()
            print(len(data))

        except queue.Empty:
            queue_full = False

#create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()
Comentarii (6)

Am găsit acest lucru foarte util: a crea cât mai multe fire ca nuclee și lăsați-le să execute o (mare) numărul de sarcini (în acest caz, de asteptare un program shell):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): #put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        #execute a task: call a shell program and wait until it completes
        subprocess.call("echo "+str(item), shell=True) 
        q.task_done()

cpus=multiprocessing.cpu_count() #detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() #block until all tasks are done
Comentarii (3)

Având o funcție "f", fir astfel:

import threading
threading.Thread(target=f).start()

Pentru a trece de argumente pentru a "f"

threading.Thread(target=f, args=(a,b,c)).start()
Comentarii (5)

Python 3-a facilitatea de Lansarea paralel sarcini. Acest lucru face munca mai ușoară.

A pentru fir comun și Proces comun.

Următoarele vă oferă o imagine de ansamblu:

ThreadPoolExecutor Exemplu

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()
Comentarii (0)

Pentru mine, exemplul perfect pentru Filetare este monitorizarea evenimente Asincrone. Uita-te la acest cod.

# thread_test.py
import threading
import time 

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

Te poti juca cu acest cod prin deschiderea unui IPython sesiune și de a face ceva de genul:

>>>from thread_test import Monitor
>>>a = [0]
>>>mon = Monitor(a)
>>>mon.start()
>>>a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

Așteptați câteva minute

>>>a[0] = 2
Mon = 2
Comentarii (5)

Folosind aprins nou concurente.futures modulul

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

Executorul abordare ar putea părea familiar pentru toți cei care au ajuns pe mâinile lor murdare cu Java înainte.

De asemenea, pe o parte notă: Pentru a menține universul sănătos, don't uitați să închideți piscine/executorii, dacă nu't folosi " cu " context (care este atât de minunat că o face pentru tine)

Comentarii (0)

Cele mai multe documentații și tutoriale folosi Python's Filetat și "Coadă", modulul ar putea părea copleșitoare pentru incepatori.

Poate lua în considerare simultane.contracte futures.ThreadPoolExecutor` modulul de python 3. Combinat cu " cu " clauză și lista de înțelegere ar putea fi un real farmec.

from concurrent.futures import ThreadPoolExecutor, as_completed

def get_url(url):
    # Your actual program here. Using threading.Lock() if necessary
    return ""

# List of urls to fetch
urls = ["url1", "url2"]

with ThreadPoolExecutor(max_workers = 5) as executor:

    # Create threads 
    futures = {executor.submit(get_url, url) for url in urls}

    # as_completed() gives you the threads once finished
    for f in as_completed(futures):
        # Get the results 
        rs = f.result()
Comentarii (0)

Am văzut o mulțime de exemple aici, unde nu au fost efectuate + au cea mai mare unitate PROCESOR legat. Aici este un exemplu de un PROCESOR legat de sarcina care calculează toate numerele prime între 10 milioane și 10.05 milioane de euro. Eu am folosit toate cele 4 metode aici

import math
import timeit
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def time_stuff(fn):
    """
    Measure time of execution of a function
    """
    def wrapper(*args, **kwargs):
        t0 = timeit.default_timer()
        fn(*args, **kwargs)
        t1 = timeit.default_timer()
        print("{} seconds".format(t1 - t0))
    return wrapper

def find_primes_in(nmin, nmax):
    """
    Compute a list of prime numbers between the given minimum and maximum arguments
    """
    primes = []

    #Loop from minimum to maximum
    for current in range(nmin, nmax + 1):

        #Take the square root of the current number
        sqrt_n = int(math.sqrt(current))
        found = False

        #Check if the any number from 2 to the square root + 1 divides the current numnber under consideration
        for number in range(2, sqrt_n + 1):

            #If divisible we have found a factor, hence this is not a prime number, lets move to the next one
            if current % number == 0:
                found = True
                break

        #If not divisible, add this number to the list of primes that we have found so far
        if not found:
            primes.append(current)

    #I am merely printing the length of the array containing all the primes but feel free to do what you want
    print(len(primes))

@time_stuff
def sequential_prime_finder(nmin, nmax):
    """
    Use the main process and main thread to compute everything in this case
    """
    find_primes_in(nmin, nmax)

@time_stuff
def threading_prime_finder(nmin, nmax):
    """
    If the minimum is 1000 and the maximum is 2000 and we have 4 workers
    1000 - 1250 to worker 1
    1250 - 1500 to worker 2
    1500 - 1750 to worker 3
    1750 - 2000 to worker 4
    so lets split the min and max values according to the number of workers
    """
    nrange = nmax - nmin
    threads = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)

        #Start the thrread with the min and max split up to compute
        #Parallel computation will not work here due to GIL since this is a CPU bound task
        t = threading.Thread(target = find_primes_in, args = (start, end))
        threads.append(t)
        t.start()

    #Dont forget to wait for the threads to finish
    for t in threads:
        t.join()

@time_stuff
def processing_prime_finder(nmin, nmax):
    """
    Split the min, max interval similar to the threading method above but use processes this time
    """
    nrange = nmax - nmin
    processes = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)
        p = multiprocessing.Process(target = find_primes_in, args = (start, end))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

@time_stuff
def thread_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method but use thread pool executor this time
    This method is slightly faster than using pure threading as the pools manage threads more efficiently
    This method is still slow due to the GIL limitations since we are doing a CPU bound task
    """
    nrange = nmax - nmin
    with ThreadPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

@time_stuff
def process_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method but use the process pool executor
    This is the fastest method recorded so far as it manages process efficiently + overcomes GIL limitations
    RECOMMENDED METHOD FOR CPU BOUND TASKS
    """
    nrange = nmax - nmin
    with ProcessPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

def main():
    nmin = int(1e7)
    nmax = int(1.05e7)
    print("Sequential Prime Finder Starting")
    sequential_prime_finder(nmin, nmax)
    print("Threading Prime Finder Starting")
    threading_prime_finder(nmin, nmax)
    print("Processing Prime Finder Starting")
    processing_prime_finder(nmin, nmax)
    print("Thread Executor Prime Finder Starting")
    thread_executor_prime_finder(nmin, nmax)
    print("Process Executor Finder Starting")
    process_executor_prime_finder(nmin, nmax)

main()

Aici sunt rezultatele pe Mac OSX 4 core mașină

Sequential Prime Finder Starting
9.708213827005238 seconds
Threading Prime Finder Starting
9.81836523200036 seconds
Processing Prime Finder Starting
3.2467174359990167 seconds
Thread Executor Prime Finder Starting
10.228896902000997 seconds
Process Executor Finder Starting
2.656402041000547 seconds
Comentarii (4)

Aici este foarte simplu exemplu de import CSV folosind filetare. [Biblioteca de incluziune pot să difere pentru diferite scop ]

Funcțiile Helper:

from threading import Thread
from project import app 
import csv

def import_handler(csv_file_name):
    thr = Thread(target=dump_async_csv_data, args=[csv_file_name])
    thr.start()

def dump_async_csv_data(csv_file_name):
    with app.app_context():
        with open(csv_file_name) as File:
            reader = csv.DictReader(File)
            for row in reader:
                #DB operation/query

Funcții De Conducere:

import_handler(csv_file_name) 
Comentarii (0)

Multi threading cu simplu exemplu care va fi de ajutor. Puteți rula și de a înțelege cu ușurință cum este multi firul de lucru în python. Am folosit de blocare pentru a preveni accesul altor fire până anterioare fire terminat munca lor. Prin utilizarea de

tLock = filetare.BoundedSemaphore(valoare=4)

această linie de cod puteți permite numere de proces la un moment dat și țineți apăsat pentru restul de fir care se va desfășura mai târziu sau după ce a terminat anterior procese.

import threading
import time

#tLock = threading.Lock()
tLock = threading.BoundedSemaphore(value=4)
def timer(name, delay, repeat):
    print  "\r\nTimer: ", name, " Started"
    tLock.acquire()
    print "\r\n", name, " has the acquired the lock"
    while repeat > 0:
        time.sleep(delay)
        print "\r\n", name, ": ", str(time.ctime(time.time()))
        repeat -= 1

    print "\r\n", name, " is releaseing the lock"
    tLock.release()
    print "\r\nTimer: ", name, " Completed"

def Main():
    t1 = threading.Thread(target=timer, args=("Timer1", 2, 5))
    t2 = threading.Thread(target=timer, args=("Timer2", 3, 5))
    t3 = threading.Thread(target=timer, args=("Timer3", 4, 5))
    t4 = threading.Thread(target=timer, args=("Timer4", 5, 5))
    t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5))

    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()

    print "\r\nMain Complete"

if __name__ == "__main__":
    Main()
Comentarii (0)

Niciuna dintre soluțiile de mai sus, de fapt, folosit de mai multe nuclee pe GNU/Linux server (în cazul în care nu't ai drepturi de admin). Au fugit pe un singur nucleu. Am folosit mai mic nivel de `os.furculiță pentru interfata să ruleze mai multe procese. Acesta este codul care a lucrat pentru mine:

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break
Comentarii (0)

Aș dori să contribuie cu un exemplu simplu și explicații am'am găsit util atunci când am avut de a aborda aceasta problema.

În acest răspuns, veți găsi unele informații despre Python's GIL (Global Interpret de Blocare) și o simplă zi cu zi de exemplu scris folosind multiprocesare.dummy plus unele repere simple.

Global Interpret de Blocare (GIL)

Python nu't permite multi-threading în adevăratul sens al cuvântului. Ea are un multi-threading pachet dar dacă vrei să-multi-thread pentru a accelera codul, apoi se's, de obicei, nu este o idee bună să-l folosească. Python are un construct numit Global Interpret de Blocare (GIL). GIL face sigur că doar unul din 'fire' poate executa la un moment dat. Un fir dobândește GIL, face un pic de lucru, apoi trece la GIL pe lângă fir. Acest lucru se întâmplă foarte repede, astfel încât ochiul uman nu poate părea ca subiectele sunt de executare în paralel, dar ei sunt doar ținând transformă folosind același CPU core. Toate acestea GIL trecerea adaugă aeriană de la executie. Acest lucru înseamnă că, dacă doriți pentru a face codul alerga mai repede apoi, folosind filetare pachet de multe ori e't o idee bună.

Există motive pentru a folosi Python's filetare pachet. Dacă doriți să rulați unele lucruri simultan, iar eficiența nu este un motiv de îngrijorare, apoi se's în regulă și convenabil. Sau dacă executați codul care trebuie să așteptați pentru ceva (cum ar fi unele IO), atunci s-ar putea face o mulțime de sens. Dar filetare biblioteca a câștigat't să utilizați suplimentar de nuclee CPU.

Multi-threading pot fi externalizate către sistemul de operare (de a face multi-processing), unele aplicații externe care solicită codul Python (de exemplu, Scânteie sau Hadoop), sau un cod care Python codul de apeluri (de exemplu: ai putea să-ți cod Python sun o C funcția care face scumpe multi-threaded chestii).

De Ce Acest Lucru Contează

Pentru că o mulțime de oameni petrec o mulțime de timp încercând să găsească blocaje în lor de lux Python multi-threaded de cod înainte de a învăța ce-i GIL.

Odată ce această informație este clar aici's codul meu:

#!/bin/python
from multiprocessing.dummy import Pool
from subprocess import PIPE,Popen
import time
import os

# In the variable pool_size we define the "parallelness".
# For CPU-bound tasks, it doesn't make sense to create more Pool processes
# than you have cores to run them on.
#
# On the other hand, if you are using I/O-bound tasks, it may make sense
# to create a quite a few more Pool processes than cores, since the processes
# will probably spend most their time blocked (waiting for I/O to complete).
pool_size = 8

def do_ping(ip):
    if os.name == 'nt':
        print ("Using Windows Ping to " + ip)
        proc = Popen(['ping', ip], stdout=PIPE)
        return proc.communicate()[0]
    else:
        print ("Using Linux / Unix Ping to " + ip)
        proc = Popen(['ping', ip, '-c', '4'], stdout=PIPE)
        return proc.communicate()[0]

os.system('cls' if os.name=='nt' else 'clear')
print ("Running using threads\n")
start_time = time.time()
pool = Pool(pool_size)
website_names = ["www.google.com","www.facebook.com","www.pinterest.com","www.microsoft.com"]
result = {}
for website_name in website_names:
    result[website_name] = pool.apply_async(do_ping, args=(website_name,))
pool.close()
pool.join()
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Now we do the same without threading, just to compare time
print ("\nRunning NOT using threads\n")
start_time = time.time()
for website_name in website_names:
    do_ping(website_name)
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Here's one way to print the final output from the threads
output = {}
for key, value in result.items():
    output[key] = value.get()
print ("\nOutput aggregated in a Dictionary:")
print (output)
print ("\n")

print ("\nPretty printed output:")
for key, value in output.items():
    print (key + "\n")
    print (value)
Comentarii (0)

Cu împrumut de la acest mesaj știm despre alegerea între multithreading, multiprocesare, și asincron și utilizarea lor.

Python3 are o nouă bibliotecă built-in pentru concurență și paralelism: concurente.futures

Așa că am demonstra printr-un experiment la care rulează patru sarcini (de exemplu .sleep ()) metoda de Filetare-Pool` mod:

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time

def concurrent(max_worker=1):
    futures = []

    tick = time()
    with ThreadPoolExecutor(max_workers=max_worker) as executor:
        futures.append(executor.submit(sleep, 2))
        futures.append(executor.submit(sleep, 1))
        futures.append(executor.submit(sleep, 7))
        futures.append(executor.submit(sleep, 3))

        for future in as_completed(futures):
            if future.result() is not None:
                print(future.result())

    print('Total elapsed time by {} workers:'.format(max_worker), time()-tick)

concurrent(5)
concurrent(4)
concurrent(3)
concurrent(2)
concurrent(1)

Out:

Total elapsed time by 5 workers: 7.007831811904907
Total elapsed time by 4 workers: 7.007944107055664
Total elapsed time by 3 workers: 7.003149509429932
Total elapsed time by 2 workers: 8.004627466201782
Total elapsed time by 1 workers: 13.013478994369507

[NOTĂ]:

  • După cum puteți vedea în rezultatele de mai sus, cel mai bun caz a fost 3 muncitori pentru cei patru sarcini.
  • Dacă aveți o sarcină proces în loc de I/O bound (folosind "subiect") ai putea schimba ThreadPoolExecutor " cu " ProcessPoolExecutor
Comentarii (0)
import threading
import requests

def send():

  r = requests.get('https://www.stackoverlow.com')

thread = []
t = threading.Thread(target=send())
thread.append(t)
t.start()
Comentarii (5)