Objective-Cで、複数スレッドで並行処理させる。(Producer-Consumerパターン)

今日は、デザインパターンで言うところのProducer-ConsumerパターンをObjective-Cで実装してみます。(GCDで並列化する方法もありますが、今回は使いません。)


複数の処理があった場合、処理を並行に処理したいようなことがあります。
例えば、Webへのリクエストなど、レスポンスの待ちなどが発生する場合は、直列で処理するよりも並行で処理する方が格段に処理速度を改善できる場合があります。


イメージはこんな形、Producer(仕事を作る人)、Queue(仕事を置く台)、Consumer(仕事を処理する人)という登場人物がいます。
Produceが仕事を作って、作業台に置くと、作業者が仕事を取って処理を行うという動作を行います。





まずは、Queueクラス。仕事の受け渡しを行うクラスです。仕事を置くputQueメソッドと、仕事を取り出すgetQueメソッドがあります。
それぞれ、同じ仕事を2人で取ったりしないように、出し入れする際はNSLockクラスのロックをとれた時に操作できるようになっています。


Queue.h

#import "Queue.h"

typedef enum {
    StateIdle,
    StateRunning,
    StateSuspend,
} threadState;

@interface ConsumerThread : NSObject {
    volatile threadState state_;     // 必ず volatile にすること
    NSCondition *cond_;              // state を保護し、状態変化を伝える
    NSString *identifier_;
    Queue *que_;
}

- (id)initWithQue:(Queue*)que identifier:(NSString*)identifier;
- (void)start;
- (void)suspend;
- (void)quit;

- (void)run;

@end



Queue.m

#import "Queue.h"

@implementation Queue

- (id)initWithQue:(NSMutableArray*)queue
{
    theLock_ = [[NSLock alloc] init];
    queObject_ = queue;
    return self;
}

- (void)putQue:(NSObject*)object
{
    [theLock_ lock];
    while (queObject_.count >= MAX_QUEUE_COUNT) {
        // 最大Queue数の仕事が溜まっている場合はwaitする。
        [theLock_ wait];
    }
    // 仕事(後ろに)を追加
    [queObject_ addObject:object];
    [theLock_ unlock];
}

- (NSObject*)getQue
{
    NSObject *firstObject = nil;
    [theLock_ lock];
    if (queObject_.count > 0) {
        // 先頭から取り出し削除
        firstObject = [queObject_ objectAtIndex:0];
        [queObject_ removeObjectAtIndex:0];
        // waitを解除
        [theLock_ signal];
    }
    [theLock_ unlock];
    // QueueがあったらObjectを返す。無かったらnilを返す。
    return firstObject;
}

@end





次は、ConsumerThreadクラスです。これは並行度分生成され実行されます。
performSelectorInBackgroundで呼び出される、runメソッドが別Threadで実行されるようになっています。
runメソッドは、Queからひとつ仕事を取り出して処理を行います。


ConsumerThread.h

#import "Queue.h"

typedef enum {
    StateIdle,
    StateRunning,
    StateSuspend,
} threadState;

@interface ConsumerThread : NSObject {
    volatile threadState state_;     // 必ず volatile にすること
    NSCondition *cond_;              // state を保護し、状態変化を伝える
    NSString *identifier_;
    Queue *que_;
}

- (id)initWithQue:(Queue*)que identifier:(NSString*)identifier;
- (void)start;
- (void)suspend;
- (void)quit;

- (void)run;

@end

ConsumerThread.m

#import "ConsumerThread.h"

@interface ConsumerThread (PrivateMethods)
- (void)run;
@end

@implementation ConsumerThread

- (id)initWithQue:(Queue*)que identifier:(NSString*)identifier{
    self = [super init];
    if (self != nil) {
        cond_ = [[NSCondition alloc] init];
        state_ = StateIdle;
        identifier_ = identifier;
        que_ = que;
    }
    return self;
}

- (void)start {
    [cond_ lock];

    // StateIdleの場合のみ開始
    if (state_ == StateIdle) {
        state_ = StateRunning;
        [self performSelectorInBackground:@selector(run) withObject:nil];
    }

    [cond_ unlock];
}

- (void)suspend {
    [cond_ lock];

    if (state_ == StateRunning) {
        state_ = StateSuspend;
        while (state_ != StateIdle) {
            // 処理の終了を待つ。
            [cond_ wait];
        }
    }

    [cond_ unlock];
}

- (void)quit {
    [cond_ lock];

    if (state_ == StateRunning) {
        state_ = StateSuspend;
        while (state_ != StateIdle) {
            // 処理の終了を待つ。
            [cond_ wait];
        }
    }
    [cond_ unlock];
    // 終了処理
    // ・・・
}

- (void)run {
    @autoreleasepool {
        while (true) {
            // メインの処理
            NSObject *object = [que_ getQue];
            // QueueからObjectが取得できたら、処理を行う。
            if (object) {
                // なんかの処理
                [NSThread sleepForTimeInterval:1.0];
                NSLog(@"Thread(%@) : %@",identifier_, object);
            }
            if (state_ == StateSuspend) {
                break;
            }
        }

        [cond_ lock];
        if (state_ == StateSuspend) {
            // waitしているスレッドを起こす。
            [cond_ signal];
        }
        state_ = StateIdle;
        [cond_ unlock];
    }
}
@end





最後は、Produceクラス。
Queueクラス、ConsumerThreadクラスを生成します。


同じQueueを共有するため、それぞれのConsumerThreadに、同じQueueクラスのインスタンスを引数で渡し生成します。
ConsumerThreadクラスの開始はstartメソッド、停止はquitメソッドで行います。startすると別スレッドでrunメソッドが動きだし、Queueから仕事を取り出して処理を行います。
仕事(Queue)が無くなったのを確認して、ConsumerThreadクラスの停止を行います。
(Producerが単数のパターンを作成しましたが、Producerを複数生成しQueueを共有すれば複数にすることもできます。)

    // Queueの作成
    NSMutableArray *que = [NSMutableArray arrayWithObjects:@"1",@"2",@"3",@"4",@"5",nil];
    Queue *queue = (Queue*)[[Queue alloc] initWithQue:que];

    // Consumerの作成
    ConsumerThread *thread[MAX_THREAD_COUNT];
    for (int i=0; i < MAX_THREAD_COUNT; i++) {
        thread[i] = [[ConsumerThread alloc] initWithQue:queue identifier:[NSString stringWithFormat:@"%d",i]];
        // Consumerの作業開始
        [thread[i] start];
    }

    // 処理完了の待機
    while (que.count > 0) {
        [NSThread sleepForTimeInterval:1.0];
    }

    // Consumerの仕事終了
    for (int i=0; i < MAX_THREAD_COUNT; i++) {
        [thread[i] quit];
    }



Producer-Consumerパターンを使いたいなと思ったのですが、サンプルが見つからなかったので作成してみました。参考になれば幸いです。