Package com.goodow.realtime.store.impl

Source Code of com.goodow.realtime.store.impl.SubscribeOnlyStore

/*
* Copyright 2014 Goodow.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.goodow.realtime.store.impl;

import com.goodow.realtime.channel.Bus;
import com.goodow.realtime.channel.Message;
import com.goodow.realtime.channel.impl.ReconnectBus;
import com.goodow.realtime.channel.impl.ReliableSubscribeBus;
import com.goodow.realtime.core.Handler;
import com.goodow.realtime.core.Platform;
import com.goodow.realtime.core.Registration;
import com.goodow.realtime.json.Json;
import com.goodow.realtime.json.JsonObject;
import com.goodow.realtime.operation.Transformer;
import com.goodow.realtime.operation.impl.CollaborativeOperation;
import com.goodow.realtime.operation.impl.CollaborativeTransformer;
import com.goodow.realtime.store.Document;
import com.goodow.realtime.store.Error;
import com.goodow.realtime.store.Model;
import com.goodow.realtime.store.channel.Constants;
import com.goodow.realtime.store.channel.Constants.Key;
import com.goodow.realtime.store.impl.DocumentBridge.OutputSink;

public class SubscribeOnlyStore extends MemoryStore {
  public SubscribeOnlyStore(Bus bus) {
    super(bus);
  }

  public SubscribeOnlyStore(String serverUri, JsonObject options) {
    this(new ReliableSubscribeBus(new ReconnectBus(serverUri, options), options));
  }

  @Override
  public void load(final String id, final Handler<Document> onLoaded,
      final Handler<Model> opt_initializer, final Handler<Error> opt_error) {
    bus.send(Constants.Topic.STORE, Json.createObject().set(Key.ID, id), new Handler<Message<JsonObject>>() {
      @Override
      public void handle(Message<JsonObject> message) {
        JsonObject body = message.body();
        if (!body.has(Key.VERSION)) {
          body.set(Key.VERSION, 0);
        }
        final DocumentBridge bridge =
            new DocumentBridge(SubscribeOnlyStore.this, id, body.getArray(Key.SNAPSHOT),
                               body.getArray(Key.COLLABORATORS), opt_error);
        onLoaded(id, opt_initializer, body.getNumber(Key.VERSION), bridge);
        if (body.getNumber(Key.VERSION) == 0) {
          bridge.createRoot();
          if (opt_initializer != null) {
            Platform.scheduler().handle(opt_initializer, bridge.getDocument().getModel());
          }
        }
        bridge.scheduleHandle(onLoaded, bridge.getDocument());
      }
    });
  }

  protected void onLoaded(final String id, Handler<Model> opt_initializer, double version,
      final DocumentBridge bridge) {
    String topic = Constants.Topic.STORE + "/" + id + Constants.Topic.WATCH;
    if (bus instanceof ReliableSubscribeBus) {
      ((ReliableSubscribeBus) bus).synchronizeSequenceNumber(topic, version - 1);
    }
    final Registration handlerReg =
        bus.subscribe(topic, new Handler<Message<JsonObject>>() {
          Transformer<CollaborativeOperation> transformer = new CollaborativeTransformer();

          @Override
          public void handle(Message<JsonObject> message) {
            JsonObject body = message.body();
            CollaborativeOperation op = transformer.createOperation(body);
            bridge.consume(op);
          }
        });
    bridge.setOutputSink(new OutputSink() {
      @Override
      public void close() {
        handlerReg.unregister();
      }

      @Override
      public void consume(CollaborativeOperation op) {
      }
    });
  }
}
TOP

Related Classes of com.goodow.realtime.store.impl.SubscribeOnlyStore

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.