多线程观察者模式

Multithreaded Observer Pattern

本文关键字:观察者模式 多线程      更新时间:2023-10-16

我有一个问题,每次都会在不同的线程中更新主题。因此,每当主题被更新时,它就会相应地用新信息更新观察者。然而,如果观察员名单很长,则需要一些时间来更新所有观察员。想想一个经常更新的主题。当主题更新观察者时,"主题"对象被锁定,因此无法由其他线程更新。这将为主题创建信息流量或导致信息丢失。

你知道在多线程环境中如何处理这些问题吗?另外,有人能推荐一些关于C++并行编程的书吗?

考虑使用生产者-消费者队列或消息队列。例如,您可以通过两种方式使用队列:

  1. 对主题的更改已排队。当某个内容更新主题时,它会将新状态放入队列并立即返回。这样,在通知观察者时,更新程序不会阻塞。您将需要一个线程来不断地将状态更改排成队列并更新观察者。

  2. 给观察者的通知已排队。每个观察者都有一个队列,在其中发布主题状态更改通知。

如果你正在使用Qt库,你可以使用信号&具有Qt::QueuedConnection连接类型的slots机制。插槽通过接收器的事件队列,并在接收器的线程中执行。这样,当接收器执行它们各自的时隙时,发送器不进行阻塞。

您的程序可能是Actor模型(范例)的一个很好的候选者。以下是一些实现actor模型的C++库:

  • Theron
  • libcppa(基于C++11)
  • 异步代理库(Microsoft)

您的程序也可能是数据流范例的一个很好的候选者。查看建议的Boost数据流库,它支持线程处理。


我没有什么书可以推荐,但可以看看Herb Sutter关于C++并发的Dr Dobbs系列文章。

我在Java 中编写了一个多线程观察器模式

import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
/**
 * An observer pattern that allows listeners to register(), unregister() in
 * multiple threads and also notify listeners in another thread.
 * 
 * A HashMap keeps track of the listeners and their status (active, obsolete).
 * When a listener unregister, its entry is marked as obsolete in this map.
 * 
 * During firing of an event, the observer notifies all the listeners that are
 * active, the active status will be stored in a Boolean that's synchronized so
 * rare race conditions like calling notify on an active listener that has just
 * turned obsolete will not happen.
 * 
 * 
 */
public class MultithreadedObserverPattern <T extends AbstractListener> {
    interface Handler<T> {
        void handleEvent(T listener);
    }
    class BooleanHolder {
        boolean val;
        BooleanHolder(boolean v) {
            val = v;
        }
        void set(boolean v) {
            val = v;
        }
        boolean get() {
            return val;
        }
    }
    Map<AbstractListener, BooleanHolder> listeners = new HashMap<AbstractListener, BooleanHolder>();
    public void register(AbstractListener l) {
        synchronized (listeners) {
            listeners.put(l, new BooleanHolder(true));
        }
    }
    public void unregister(AbstractListener l) {
        synchronized (listeners) {
            BooleanHolder status = listeners.get(l);
            if (status != null) {
                // notify call also syncing on status
                synchronized (status) {
                    status.set(false);
                }
            }
            // set to false
        }
    }
    public void notifyAll(Handler handler) {
        // here we do not synchroznie on listeners to avoid tricky lock situations
        // make a copy of the map
        List<Entry<AbstractListener, BooleanHolder>> activeListeners = new ArrayList<Entry<AbstractListener, BooleanHolder>>();
        List<AbstractListener> inactiveListeners = new ArrayList<AbstractListener>(); 
        synchronized (listeners) {
            for (Entry<AbstractListener, BooleanHolder> entry : listeners.entrySet()) {
                if (entry.getValue().get()) {
                    activeListeners.add(entry);
                } else {
                    inactiveListeners.add(entry.getKey());
                }
            }
        }
         // call the method on active listener
        // 
        for (Entry<AbstractListener, BooleanHolder> e : activeListeners) {
            BooleanHolder status = e.getValue();
            // remove those listeners that are no longer active
            synchronized (status) {
                if (status.get()) {
                      handler.handleEvent(e.getKey());
                }
            }
        }
        synchronized (listeners) {
            // remove inactive listeners
            for (AbstractListener l : inactiveListeners) {
                listeners.remove(l);
            }
        }
    }
}