티스토리 뷰

반응형

 

 

AsyncStream structure 정의

swift 5.5 부터 소개된 Swift Conrreucy의 개념으로 AsyncStream이 있습니다. iOS13부터 지원을 하고 있습니다.

AsyncStream은 클로저를 통해 생성된 비동기 시퀀스를 정의합니다. AsyncStream을 통해 제공되는 closure 내에서 continuation을 통해 다수의 새로운 값을 생산하고, 이를 async 하게 처리할 수 있습니다.

AsyncStream은 구조체로 정의되어 있으며, Element라는 제네릭 타입을 갖고 있습니다.

 

 


Overview

AsyncStream은 기본적으로 AsyncSequence를 준수하고 있습니다. AsyncSequence를 준수하기 위해서는 몇가지 사항을 준수해야하는데, 이를 수동적으로 준수하지 않더라도 쉽게 Async하게 Sequence를 사용할 수 있습니다.

AsyncStream을 초기화할때, AsyncStream.Continuation을 받는 클로져를 함께 사용하게 됩니다. 이때 클로져 내부에서 continuation의 yield(_:)를 사용해서 element를 생산할 수 있고, 더이상 element를 생산할 일이 없다면, continuation에서 finish()를 호출할 수 있습니다.

finish()를 호출하게 되면 해당 AsyncStream의 Async Sequence를 종료하게 됩니다. (이때 continuation에서는 Sendable을 준수하여, 외부의 동시성 컨텍스트에서의 data races 문제 등으로부터 안전하게 호출이 가능하도록 합니다.)

element의 임의 소스의 경우, AsyncStream을 iterating 하는 것보다 더 빠르게 element를 생산할 수도 있습니다. 이런 경우에는 AsyncStream의 bufferingPolicy를 통해서 buffer 크기를 설정할 수 있습니다. 기본적인 bufferingPolicy 설정 값은 unbounded로, buffer 제한은 Int.max로 지정되어있습니다. 그 외에도 최신순 버퍼, 오래된순 버퍼 크기를 지정해서 사용할 수 도 있습니다.

 


AsyncStream 사용방법

개발자 문서 개요에도 나왔듯이, AsyncStream을 사용해서 AsyncSequence를 쉽게 활용할 수 있습니다. 또한 AsyncStream을 생성할때 제공하는 closure 내에서 continuation을 통해 element를 생산하거나, AsyncSequence를 끝낼수도 있습니다. 만약 에러 처리까지 하고 싶다면, AsyncThrowingStream을 사용해서 continuation을 통해 failure 이벤트를 AsyncThrowingStream에 제공할 수도 있습니다.

이렇게 연속적인 비동기 동작을 AsyncSequence 방식으로 처리하고 싶을때 활용가능한 AsyncStream, AsyncThrowingStream에 대해서 실제로 사용하는 예시도 알아보겠습니다.

import UIKit

class BitcoinPriceMonitor {
  
  var price: Double = 0.0
  var timer: Timer?
  var priceHandler: (Double) -> Void = { _ in }
  
  // Timer 설정에 #selector로 지정되기 위해서는 @objc를 붙여서 Objective-C runtime에서 소통가능하도록 해주어야 합니다.
  @objc func getPrice() {
    priceHandler(Double.random(in: 20000...40000))
  }
  
  func startUpdating() {
    timer = Timer.scheduledTimer(timeInterval: 1.0, target: self, selector: #selector(getPrice), userInfo: nil, repeats: true)
  }
  
  func stopUpdating() {
    timer?.invalidate()
  }
}

위 코드는 Bitcoin 가격을 모니터링하는 클래스 예시입니다. startUpdating() 메서드가 호출되면 1초 마다 20000 ~ 40000의 숫자 중 임의의 숫자를 priceHandler closure를 통해 방출합니다. stopUpdating() 메서드가 호출되면 1초 주기 타이머가 중단되면서 숫자 방출도 중단됩니다.

 

let bitcoinPriceStream = AsyncStream(Double.self) { continuation in
  let bitcoinPriceMonitor = BitcoinPriceMonitor()
  bitcoinPriceMonitor.priceHandler = {
    // AsyncSequence에 제공될 값을 전달할때 yield를 사용
    continuation.yield($0)
  }
  // continuation을 통해 onTermination callback 클로져를 설정 가능
  // continuation.onTermination = { _ in }
  bitcoinPriceMonitor.startUpdating()
}

Task {
  // AsyncStream을 사용할때 장점
  // 1) 콜백 스트림들을 AsyncSequene로 변환해서 사용할 수 있는데 이때 기존 Sequence를 채택한 애들이 사용가능한 다양한 연산자를 함께 활용 가능하다.
  // 2) Async/Await하게 동시성 프로그래밍을 할 수 있다.
  for await bitcoinPrice in bitcoinPriceStream {
    print(bitcoinPrice)
  }
}

위 코드는 앞서 BitcoinMonitor에서 방출되는 가격정보를 AsyncSequence 방식으로 처리하도록 AsyncStream을 활용하는 예시입니다. AsyncStream에서 제공하는 closure 내부에서 BitcoinPriceMotifor의 priceHandler를 사용하고 있습니다. 가격이 방출될때마다 continuation.yield($0)를 통해 가격정보를 AsyncStream에 제공하고 있습니다. 

이어서 아래에서 (unstructured concurrency)Task 블럭 내에서 AsyncSequence를 처리하는 모습을 볼 수 있습니다. for - await - in 방식으로 사용을 하는 것을 볼 수 있는데요,

 


AsyncThrowingStream 사용 예시

만약 에러가 던져질 수 있는 AsyncThrowingStream이라면, for - try await - in 방식으로 AsyncSequence를 활용할 수 있습니다.

위 코드를 예시로 보면, 특정 element를 반환하는 것 뿐만 아니라, 에러도 던질 수 있는 것을 볼 수 있어요. 또한 continuation을 사용하는 방식 이외에도 클로져에서 지정된 Element의 타입을 반환할 수도 있습니다~!

 

 


오늘은 Swift Concurrency 개념 중 하나로, AsyncSequence protocol을 기본적으로 준수하고 있는 AsyncStream에 대한 개발자문서 개요를 보면서 함께 실제 사용 예시도 돌아보았습니다.

withCheckedContinuation, withTaskGroup 등과 더불어 복잡한 비동기동작을 처리할때 활용가능한 개념이라고 생각합니다. 피드백 환영합니다. 감사합니다! ☺️

 

Reference

 

AsyncStream | Apple Developer Documentation

An asynchronous sequence generated from a closure that calls a continuation to produce new elements.

developer.apple.com

 

반응형
댓글
반응형
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/11   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
글 보관함