Executando verificação de segurança...
5

Não sei a natureza do seu projeto, mas outra opção interessante do módulo multiprocessing é usar um pool em vez de criar os processos um a um:

from multiprocessing.pool import ThreadPool

# cria um pool com 10 threads
with ThreadPool(10) as pool:
    for result in pool.imap_unordered(fibonacci, [n] * qtd_threads):
        # faz algo com o resultado

Assim, ele vai chamar a função fibonacci várias vezes, e a cada execução passa um dos valores da lista como argumento - no caso, usei uma lista contendo várias vezes o mesmo número n (o tamanho da lista é igual a quantidade de threads).

E em vez de ThreadPool, pode usar também um Pool. A diferença é que o primeiro usa threads, enquanto o segundo usa processos.

Neste caso estou passando como parâmetro 10, ou seja, o pool terá no máximo 10 threads/processos ao mesmo tempo. Quando um termina, é reusado na próxima execução. Pode parecer pior, mas veremos abaixo que faz diferença quando a quantidade total de execuções é muito grande.

E neste exemplo usei imap_unordered, que retorna os resultados em um iterável sem ordem definida. Mas se a ordem for importante, existem outras opções. Sugiro ler este artigo que explica em detalhes todas as opções existentes.


Outra opção é usar o módulo concurrent.futures, que também possui opções para usar threads ou processos (basta usar, respectivamente, ThreadPoolExecutor e ProcessPoolExecutor):

from concurrent.futures import ThreadPoolExecutor, as_completed

# usar threads (*** troque por ProcessPoolExecutor para usar processos ***)
# limitando a 10 threads no máximo
with ThreadPoolExecutor(max_workers=10) as executor:
    results = []
    for id in range(qtd_threads):
        results.append(executor.submit(fibonacci, n=30))
    for result in as_completed(results):
        # faz algo com o resultado

Testando

Fiz um teste comparativo simples, e com algumas alterações:

  • Troquei a implementação de fibonacci para não usar recursão, pois estava demorando demais. Usei um loop simples, que é bem mais rápido
  • Usei o módulo timeit, pois medir o tempo antes e depois é sujeito a muitos erros (existe por exemplo o tempo de inicialização da VM do Python, que precisa ser descartado, entre outros detalhes que o timeit já cuida pra vc)
  • Eliminei a função worker, e em vez disso eu passo a própria função fibonacci diretamente
  • Não usei print, pois I/O é demorado e interfere demais no teste
  • Para o Pool e Executor, usei no máximo 10 threads (ou seja, se o teste for com 4 threads, usa este valor; mas se tiver mais que 10 threads, limita o pool a 10)

O código completo do teste:

def fibonacci(n):
    a, b = 0, 1
    for i in range(n): 
        a, b = b, a+b
    return a

from multiprocessing.pool import ThreadPool, Pool
from multiprocessing import Process
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import threading

def pool(qtd_threads, n):
    # cria um pool com qtd_threads (ou 10, se qtd_threads for maior)
    with Pool(min(10, qtd_threads)) as pool:
        for result in pool.imap_unordered(fibonacci, [n] * qtd_threads):
            pass

def threadpool(qtd_threads, n):
    # cria um pool com qtd_threads (ou 10, se qtd_threads for maior)
    with ThreadPool(min(10, qtd_threads)) as pool:
        for result in pool.imap_unordered(fibonacci, [n] * qtd_threads):
            pass

def process(qtd_threads, n):
    processes = []
    for i in range(qtd_threads):
        process = Process(target=fibonacci, args=(n,))
        processes.append(process)
        process.start()

    for process in processes:
        process.join()

def thread(qtd_threads, n):
    threads = []
    for i in range(qtd_threads):
        thread = threading.Thread(target=fibonacci, args=(n,))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

def t_executor(qtd_threads, n):
    with ThreadPoolExecutor(max_workers=min(10, qtd_threads)) as executor:
        results = []
        for id in range(qtd_threads):
            results.append(executor.submit(fibonacci, n=n))
        for result in as_completed(results):
            pass

def p_executor(qtd_threads, n):
    with ProcessPoolExecutor(max_workers=min(10, qtd_threads)) as executor:
        results = []
        for id in range(qtd_threads):
            results.append(executor.submit(fibonacci, n=n))
        for result in as_completed(results):
            pass

from timeit import timeit

n = 30
# executa 10 vezes cada teste
params = { 'number': 10, 'globals': globals() }

# testar com várias quantidades de threads
for qtd_threads in [4, 40, 400]:
    print(f'----------------\nTestando com {qtd_threads} threads:')
    results = []
    # para cada função, computa o tempo e guarda na lista de resultados
    for func in ['pool', 'threadpool', 'process', 'thread', 't_executor', 'p_executor']:
        results.append((timeit(f'{func}(qtd_threads, n)', **params), func))
    results.sort() # ordena (os mais rápidos ficam antes)
    for t, func in results: # mostra do mais rápido para o mais lento
        print(f"{func:<12}{t}")

O timeit retorna o tempo em segundos, ou seja, quanto menor, mais rápido. Testei com diferentes quantidades de threads (4, 40 e 400), sempre com o mesmo n. Depois eu ordeno os resultados e mostro do mais rápido para o mais lento.

Lembrando que os tempos podem variar de acordo com o hardware. Na minha máquina o resultado foi:

----------------
Testando com 4 threads:
thread      0.0016444469802081585
t_executor  0.0018067560158669949
threadpool  0.008496281981933862
process     0.034114020003471524
pool        0.04882860102225095
p_executor  0.04980076797073707
----------------
Testando com 40 threads:
t_executor  0.00976670702220872
threadpool  0.013376855989918113
thread      0.014256601978559047
pool        0.10632128297584131
p_executor  0.1449061709572561
process     0.29999881994444877
----------------
Testando com 400 threads:
threadpool  0.023624072957318276
t_executor  0.0517999540315941
thread      0.14251292095286772
pool        0.18709083297289908
p_executor  0.4440593729959801
process     2.918029392021708

Ou seja, com 4 threads, usar threading.Thread foi mais rápido, mas o ThreadPoolExecutor não ficou muito atrás.

Com 40 threads, o executor e o pool ficam mais rápidos, embora a Thread ainda fique bem próxima.

Mas com 400 threads, a diferença já aumenta consideravelmente (Thread começa a ficar bem para trás).

E note também como o multiprocessing.Process acaba sendo muito pior, conforme a quantidade aumenta. O que acontece é que criar muitos processos pode acabar onerando a máquina (em termos de memória, ou até mesmo podendo estourar o máximo que o sistema operacional aguenta), enquanto que o pool reaproveita threads e pode economizar recursos.

Só pra ter uma ideia, tentei com 4000 threads, e o multiprocessing.Process estourou a quantidade máxima e deu erro. E nos demais, os pools se saíram bem melhor (exceto por um deles):

Testando com 4000 threads:
threadpool  0.11382068699458614
t_executor  0.5200726269977167
pool        0.8536747829639353
thread      1.3032061050180346
p_executor  3.0360822250368074

Note como a solução com threading.Thread piorou bem mais que os pools de threads. E o ProcessPoolExecutor conseguiu ser ainda pior (faz sentido porque processos são mais pesados que threads - mas ainda sim, veja como o pool de processos conseguiu ser melhor que a thread pura, o que mostra que usar um pool tem lá suas vantagens).

Mas claro que os parâmetros exatos dependem de cada caso, do que cada thread faz, etc. Este foi um teste simples com uma função pequena que sempre calcula o mesmo valor. Mas no fim o que vale é o teste em condições reais, e aí vc tem que avaliar as opções e ver qual se encaixa melhor no problema.

Por exemplo, mudei o máximo de threads de 10 para 30 e os tempos do pool melhoraram um pouco. Mas não é garantido que vai acontecer o mesmo no seu caso. Não existem números mágicos, sempre depende de vários fatores que vc só vai encontrar no caso real.

Não é um assunto simples, e nem sempre adicionar mais threads resolve o problema (tem vezes que pode piorar, inclusive).

Carregando publicação patrocinada...