티스토리 뷰

동시성 퓨처는 파이썬 3.2 에서 새로 추가된 기능이다.


자바 기반의 배경지식이 있다면 ThreadPoolExcutor 에 대해 익숙할 것이다.


동시성 퓨처는 TheadPoolExcutor 를 파이썬에 구현한 형태다.


멀티스레드 작업을 실행할 때, 가장 많은 연산을 필요로 하는 작업은 스레드를 시작하는 것이다.


TheadPoolExcutor 는 스레드 풀이 필요할 동안 이를 생성함으로써 문제를 다룬다.


사용자는 작업을 수행할 때 더 이상 스레드를 생성하거나 실행할 필요가 없으며, 덕분에 한번만 스레드를 다룬다.



[TheadPoolExcutor] 


from concurrent.futures import ThreadPoolExecutor


def task(n):
print(f"task number={n}")


if __name__ == '__main__':
executor = ThreadPoolExecutor(3)
executor.submit(task, 10)


인스턴스화해 최대 작업자 수를 인자로 취한다. 


ThreadPoolExcutor(3)


여기서는 3을 취해 스레드 풀이 사용자가 제출하는 작업을 처리하는 3개의 동시성 스레드만 갖는다.


submit() 함수를 호출해 함수를 인자로 취한다.



[context manager]


from concurrent.futures import ThreadPoolExecutor


def task(n):
print(f"task number={n}")


if __name__ == '__main__':
with ThreadPoolExecutor(3) as executor:
executor.submit(task, 10)


컨텍스트 매니저를 통해서


좀더 세련된 코드를 완성할 수 있다.



[map]


파이썬의 맵은 특정 함수를 iterables 내의 모든 요소를 적용해 준다.


from concurrent.futures import ThreadPoolExecutor


def task(n):
print(f"task number={n**2}")


if __name__ == '__main__':
with ThreadPoolExecutor(3) as executor:
executor.map(task, [i for i in range(10)])


리스트의 모든 요소가 count_prime 의 인자로 취하고


__exit__ 가 호출된 이후


result 에 결과를 iterables 형태로 갖게 된다.


결과값 받아오기

from concurrent.futures import ThreadPoolExecutor


def multiply_by_2(n):
return 2*n


def main():
values = [i for i in range(10)]
with ThreadPoolExecutor(2) as executor:
results = executor.map(multiply_by_2, values)
for result in results:
print(result)


if __name__ == '__main__':
main()

map 함수의 인자도 반복자로 받을 수 있는 것처럼 


리턴값으로도 결과들을 받을 수 있다.


results 를 받아서 결과를 출력해 보자




[shutdown]


실행중이 실행자를 종료하는 방법에 대해 알아보자


shutdown 메소드를 호출한 이후 부터는 작업이 실행자에 전달되지 않게 해보자


from concurrent.futures import ThreadPoolExecutor
import time


def task(n):
print(f"start id={n**2}")
time.sleep(1)
print(f"end id={n**2}")


if __name__ == '__main__':
with ThreadPoolExecutor(2) as executor:
try:
task1 = executor.submit(task, 1)
task2 = executor.submit(task, 2)
executor.shutdown(wait=True)
task3 = executor.submit(task, 3)
task4 = executor.submit(task, 4)
except RuntimeError as e:
print(e)
except Exception as e:
print(e)


여기서는 반드시 예외 처리를 해주자


그렇지 않으면 비정상 종료하기 때문이다.



[as_competed]


map 함수가 필요에 따라 적합하지 않을 수 있는데, 퓨처 객체의 배열 내 ThreadPoolExecutor 에 전달하는 모든 퓨


처 객체를 저장해야 할 수도 있다.


이러한 방법은 concurrent.futures 모듈의 as_completed 함수를 통해 실행자에 전달된 모든 작업 결과를 반환 할 


수 있다.



import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed


def count_prime(n):
def is_prime(x):
for i in range(2, x):
if not x % i:
return False
return True
return len([i for i in range(2, n+1) if is_prime(i)])


def main():
values = [random.randint(1, 100) for _ in range(10)]
tasks = list()
with ThreadPoolExecutor(2) as executor:
for value in values:
task = executor.submit(count_prime, value)
tasks.append(task)

for result in as_completed(tasks):
print(result.result())


if __name__ == '__main__':
main()


각 요청이 완료되면 프로그램은 퓨처 객체와 해당 상태를 출력한다.


여기서는 result 가 퓨처 객체에 해당한다.



[callback]


콜백을 이해하기 위해 누군가에게 시간이 꽤 걸리는 어떤 작업을 부탁하는 상황을 상항해 보자.


보통은 그 사람이 해당 작업을 끝낼 때까지 그냥 앉아서 기다리는게 아니라 다른 업무를 보고 있을 것이다.


대신에 작업을 완료 했을 때 다시 호출해주기를 요청할 것이다. 프로그래밍에서는 이러한 호출을 콜백(callback) 이


라고 하고 ThreadPoolExecutor 와의 결합을 이용한 개념이다.


ThreadPoolExecutor  에 작언을 전달하기 까지 다음과 같이 add_done_callback 을 사용한 함수로 콜백을 명시화 

한다.


from concurrent.futures import ThreadPoolExecutor, as_completed


def count_prime(n):
def is_prime(x):
for i in range(2, x):
if not x % i:
return False
return True
return len([i for i in range(2, n+1) if is_prime(i)])


def callee(self):
print("called callee")


def main():
with ThreadPoolExecutor(3) as executor:
future = executor.submit(count_prime, 100000)
future.add_done_callback(callee)


if __name__ == '__main__':
main()


콜백에 의해 불리는 callee 함수를 만들어 두자.


caller 은 부르는 쪽 callee 는 불리는 쪽이라고 할때 callee 를 add_done_callback 함수에 등록해 둔다.


futrue 객체에 등록된 작업을 완료 하였을 때 callee 를 실행하게 된다.



[exception]


자식 스레드에서 발생한 예외를 다루는 걸 살펴보자.


foo 라는 함수에서 타입을 체크해 raise 로 예외를 발생시켜 보자.


main 함수에서는 예외를 전달 받을 수있다.


main 함수에 try, except, else, finally 문을 이용해서 자식 스레드의 예외를 처리해 보자.


from concurrent.futures import ThreadPoolExecutor, as_completed


def foo(s):
if not type(s) == str:
raise Exception("value is not string")
else:
print(s)


def main():
with ThreadPoolExecutor(3) as executor:
task1 = executor.submit(foo, "a")
task2 = executor.submit(foo, "b")
task3 = executor.submit(foo, 2)
for future in as_completed([task1, task2, task3]):
print(future.result())


if __name__ == '__main__':
main()


위의 코드를 실행하면 3번째에 str 타입이 아니라 int 타입이라서 예외를 발생 시킨다.


자식 스레드에서 발생된 예외를 확인되는 것 볼 수 있다.



댓글