logo MSJO.kr

Rust, Concurrency and Channels

2020-12-02
MsJ

Rust 프로그래밍에서 concurrency 모델의 핵심 메커니즘을 비교적 간단한 소스 코드를 통하여 단계별로 정리해보았다1. Go(lang)에서 goroutine, channel을 사용하여 함수와 메소드의 동시성을 구현할 수 있게 해주는 것2처럼 Rust에서는 ‘Concurrency, Threads, Channels, Mutex and Arc’로 동시성을 구현할 수 있다.

기본 Thread 구현 및 한계
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("vector: {:?}", v);
    });

    // - error : value borrowed here after move
    // println!("{:?}", v);
    // - channel을 이용하여 해결    

    handle.join().unwrap();
}
fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        tx.send(42).unwrap();
    });

    println!("get {}", rx.recv().unwrap());
}
Channels, Mutex 확장
const NUM_TIMTERS: usize = 10;

fn timer(d: usize, tx: mpsc::Sender<usize>) {
    thread::spawn(move || {
        println!("{}: setting timer...", d);
        thread::sleep(Duration::from_secs(d as u64));
        println!("{}: sent!", d);
        tx.send(d).unwrap();
    });
}

fn main() {
    let (tx, rx) = mpsc::channel();
    for i in 0..NUM_TIMTERS {
        timer(i, tx.clone());
    }

    for v in rx.iter().take(NUM_TIMTERS) {
        println!("{}: received!", v);
    }
}
fn main() {
    let c = Arc::new(Mutex::new(0));
    let mut hs = vec![];

    for _ in 0..10 {
        let c = Arc::clone(&c);
        let h = thread::spawn(move || {
            let mut num = c.lock().unwrap();
            *num += 1;
            println!("{}", num);
        });
        hs.push(h);
    }

    for h in hs {
        h.join().unwrap();
    }

    println!("Result: {}", *c.lock().unwrap());
}
동시성 사용 예제
fn is_prime(n: usize) -> bool {
    return (2..n).all(|i| n % i != 0);
}

fn producer(tx: mpsc::SyncSender<usize>) -> thread::JoinHandle<()> {
    return thread::spawn(move || {
        for i in 100_000_000.. {
            tx.send(i).unwrap();
        }
    });
}

fn worker(id: u64, shared_rx: Arc<Mutex<mpsc::Receiver<usize>>>) {
    thread::spawn(move || loop {
        {
            let mut n = 0;
            match shared_rx.lock() {
                Ok(rx) => match rx.try_recv() {
                    Ok(_n) => {
                        n = _n;
                    }
                    Err(_) => (),
                },
                Err(_) => (),
            }

            if n != 0 {
                if is_prime(n) {
                    println!("workder {} found a prime: {}", id, n);
                }
            }
        }
    });
}

fn main() {
    let (tx, rx) = mpsc::sync_channel(1024);
    let shared_rx = Arc::new(Mutex::new(rx));

    for i in 1..13 {
        worker(i, shared_rx.clone());
    }

    producer(tx).join().unwrap();
}

Arc<T>(Atomic Reference Counting)는 Mutxt에서 동시적 상황을 안전하게 사용하게 해주는 Rc<T> 타입을 말한다3.

용어정리
  • 동시성(Concurrency) : 어느 한 순간에 하나 이상의 것을 하는 것
  • 멀티스레딩(Multithreading) : 여러 스레드를 사용하는 동시성의 한 형태
  • 병렬 처리(Parallel Processing) : 동시에 작동하는 여러 스레드 사이에서 많은 작업들을 분할하여 처리하는 것
  • 비동기 프로그래밍(Asynchronous Programming) : 불필요한 스레드 사용을 피하기 위해 future 혹은 콜백을 사용하는 동시성의 한 형태
  • 반응성 프로그래밍(Reactive Programming) : 애플리케이션이 이벤트에 반응하는 선언적 스타일의 프로그래밍4

Go(lang)에서 Concurrency 구현 예제는 ‘Go, REST API with Mux‘를 참고하자5.

Reference
  1. Tensor Programming, “Rust-lang (Concurrency, Threads, Channels, Mutex and Arc)”
  2. 후니의 컴퓨터, “goroutine and channel”
  3. The Rust Programming Language, “공유 상태 동시성”
  4. 비블레리, “Concurrency in C# Cookbook”
  5. MSJO.kr, “Go, REST API with Mux”

Prεv(Θld)   Nεxt(Nεw)
Content
Search     RSS Feed     BY-NC-ND