@Override
public Subscription onSubscribe(Observer<? super T> observer) {
final rx.operators.SafeObservableSubscription subscription = new
SafeObservableSubscription();
subscription.wrap(new Subscription() {
@Override
public void unsubscribe() {
// on unsubscribe remove it from the map of outbound observers
// to notify
observers.remove(subscription);