Package io.vertx.rxcore.test.integration.java

Source Code of io.vertx.rxcore.test.integration.java.EventBusIntegrationTest

package io.vertx.rxcore.test.integration.java;

/*
* Copyright 2013 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License.  You may obtain a copy of the License at:
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
* License for the specific language governing permissions and limitations
* under the License.
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/

import java.util.concurrent.atomic.AtomicInteger;

import io.vertx.rxcore.RxSupport;
import io.vertx.rxcore.java.eventbus.RxEventBus;
import io.vertx.rxcore.java.eventbus.RxMessage;
import io.vertx.rxcore.java.eventbus.RxStream;
import io.vertx.rxcore.java.impl.Regulator;
import org.junit.Test;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.json.JsonArray;
import org.vertx.testtools.TestVerticle;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.*;
import rx.subscriptions.Subscriptions;

import static io.vertx.rxcore.test.integration.java.RxAssert.assertCountThenComplete;
import static io.vertx.rxcore.test.integration.java.RxAssert.assertMessageThenComplete;
import static io.vertx.rxcore.test.integration.java.RxAssert.assertSingle;
import static org.vertx.testtools.VertxAssert.assertEquals;
import static org.vertx.testtools.VertxAssert.testComplete;

public class EventBusIntegrationTest extends TestVerticle {

  @Test
  public void testSimpleSubscribeReply() {
    RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());
    rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> message) {
        message.reply("pong!");
      }
    });
    Observable<RxMessage<String>> obs = rxEventBus.send("foo", "ping!");
    obs.subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> message) {
        assertEquals("pong!", message.body());
        testComplete();
      }
    });
  }

  @Test
  // Send some messages in series - i.e. wait for result of previous one before sending next one
  // PMCD: Added check to enforce 1-at-a-time
  //      
  public void testSimpleSerial() {
    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());
    final AtomicInteger totalReqs = new AtomicInteger(3);
    final AtomicInteger activeReqs = new AtomicInteger(0);
   
    rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> message) {
        System.out.println("serial-foo["+message.body()+"]");
        message.reply("pong!");
        activeReqs.incrementAndGet();
      }
    });

    Observable<RxMessage<String>> obs1 = rxEventBus.observeSend("foo", "ping!");
    Observable<RxMessage<String>> obs2 = rxEventBus.observeSend("foo", "ping!");
    Observable<RxMessage<String>> obs3 = rxEventBus.observeSend("foo", "ping!");

    Observable<RxMessage<String>> concatenated = Observable.concat(obs1, obs2, obs3);

    concatenated.subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> message) {
        System.out.println("serial-resp["+message.body()+"]");
        assertEquals("pong!", message.body());
        assertEquals(0,activeReqs.decrementAndGet());
        if (totalReqs.decrementAndGet()==0)
          testComplete();
      }
    });
  }

  @Test
  // Send some messages in series where next message sent is function of reply from previous message
  public void testSerial() {
    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> message) {
        message.reply(message.body());
      }
    });

    Observable<RxMessage<String>> obs1 = rxEventBus.send("foo", "ping1");
    Observable<RxMessage<String>> obs2 = obs1.flatMap(new Func1<RxMessage<String>, Observable<RxMessage<String>>>() {
      @Override
      public Observable<RxMessage<String>> call(RxMessage<String> reply) {
        return rxEventBus.send("foo", reply.body() + "ping2");
      }
    });
    Observable<RxMessage<String>> obs3 = obs2.flatMap(new Func1<RxMessage<String>, Observable<RxMessage<String>>>() {
      @Override
      public Observable<RxMessage<String>> call(RxMessage<String> reply) {
        return rxEventBus.send("foo", reply.body() + "ping3");
      }
    });
   
    assertMessageThenComplete(obs3,"ping1ping2ping3");
  }

  @Test
  // Send some messages in parallel and wait for all replies before doing something
  public void testGather() {

    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> msg) {
        msg.reply("pong"+msg.body());
      }
    });

    Observable<RxMessage<String>> obs1 = rxEventBus.send("foo", "A");
    Observable<RxMessage<String>> obs2 = rxEventBus.send("foo", "B");
    Observable<RxMessage<String>> obs3 = rxEventBus.send("foo", "C");
    Observable<RxMessage<String>> merged = Observable.merge(obs1, obs2, obs3);

    assertMessageThenComplete(merged.takeLast(1),"pongC");
  }

  @Test
  // Send some messages in parallel and return result of concatenating all the messages
  public void testConcatResults() {
    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> msg) {
        msg.reply("pong"+msg.body());
      }
    });

    Observable<RxMessage<String>> obs1 = rxEventBus.send("foo", "A");
    Observable<RxMessage<String>> obs2 = rxEventBus.send("foo", "B");
    Observable<RxMessage<String>> obs3 = rxEventBus.send("foo", "C");
    Observable<RxMessage<String>> merged = Observable.merge(obs1, obs2, obs3);
    Observable<String> result = merged.reduce("", new Func2<String, RxMessage<String>, String>() {
      @Override
      public String call(String accum, RxMessage<String> reply) {
        return accum + reply.body();
      }
    });

    RxAssert.assertSequenceThenComplete(result.takeLast(1),"pongApongBpongC");
  }

  @Test
  public void testSimpleRegisterHandler() {
    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    Observable<RxMessage<String>> obs = rxEventBus.registerHandler("foo");

    assertMessageThenComplete(obs.take(1),"hello");

    // Send using core EventBus
    vertx.eventBus().send("foo", "hello");
  }

  @Test
  public void testReplyToReply() {
    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    Observable<RxMessage<String>> obsReply1 = rxEventBus.registerHandler("foo");

    Observable<RxMessage<String>> obsReply2 = obsReply1.flatMap(new Func1<RxMessage<String>, Observable<RxMessage<String>>>() {
      @Override
      public Observable<RxMessage<String>> call(RxMessage<String> stringRxMessage) {
        // Reply to the message
        assertEquals("hello1", stringRxMessage.body());
        return stringRxMessage.observeReply("goodday1");
      }
    });

    Observable<RxMessage<String>> obsReply3 = obsReply2.flatMap(new Func1<RxMessage<String>, Observable<RxMessage<String>>>() {
      @Override
      public Observable<RxMessage<String>> call(RxMessage<String> stringRxMessage) {
        // Reply to the reply!
        assertEquals("hello2", stringRxMessage.body());
        return stringRxMessage.observeReply("goodday2");
      }
    });
    obsReply3.subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> stringRxMessage) {
      }
    });

    Observable<RxMessage<String>> obsSend1 = rxEventBus.send("foo", "hello1");

    Observable<RxMessage<String>> obsSend2 = obsSend1.flatMap(new Func1<RxMessage<String>, Observable<RxMessage<String>>>() {
      @Override
      public Observable<RxMessage<String>> call(RxMessage<String> stringRxMessage) {
        // The first reply
        assertEquals("goodday1", stringRxMessage.body());
        // Now reply to the reply
        return stringRxMessage.observeReply("hello2");
      }
    });

    assertMessageThenComplete(obsSend2,"goodday2");
  }

  @Test
  public void testRetry() {

    final RxEventBus rx=new RxEventBus(vertx.eventBus());

    vertx.eventBus().registerHandler("every3", new Handler<Message<Integer>>() {

      private int times=0;

      public void handle(Message<Integer> msg) {
        if (times++%3!=2) {
          System.out.println("no!");
          msg.fail(500,"no");
          return;
        }
        System.out.println("yes!");
        msg.reply("yes");
      }
    });

    // Keep asking
    Observable<String> res=rx.<String,String>observeSend("every3", "please")
      .map(new Func1<RxMessage<String>, String>() {
        public String call(RxMessage<String> msg) {
          return msg.body();
        }
      })
      .retry(3);

    assertCountThenComplete(res, 1);
  }

  @Test
  public void testStream() {

    final RxEventBus rx=new RxEventBus(vertx.eventBus());

    vertx.eventBus().registerHandler("countdown", new Handler<Message<Integer>>() {

      public void sendBatch(final Message<Integer> msg, int from, int length) {
        JsonArray res = new JsonArray();
        for (int i = from; i < from+length; i++) {
          res.add(i);
        }
        // As long as above 0 wait for another request
        if (from > 0) {
          msg.reply(res, this);
        } else {
          msg.reply(res);
        }
      }

      public void handle(Message<Integer> msg) {
        sendBatch(msg, msg.body(), 10);
      }
    });

    Observable<Buffer> res=rx.<Integer,JsonArray>observeStream("countdown", 400)
      .map(new Func1<RxStream<Integer, JsonArray>, JsonArray>() {
        public JsonArray call(RxStream<Integer, JsonArray> s) {
          int start = s.value().get(0);
          if (start > 0) {
            s.next(start - 10);
          }
          return s.value();
        }
      })
      .map(new Func1<JsonArray, Buffer>() {
        public Buffer call(JsonArray data) {
          return new Buffer(data.encode());
        }
      });

    assertCountThenComplete(res,41);
  }

  @Test
  public void testFlow() {

    final RxEventBus rx=new RxEventBus(vertx.eventBus());

    vertx.eventBus().registerHandler("countdown", new Handler<Message<Integer>>() {

      public void sendBatch(final Message<Integer> msg, int from, int length) {
        JsonArray res = new JsonArray();
        for (int i = from; i < from+length; i++) {
          res.add(i);
        }
        // As long as above 0 wait for another request
        if (from > 0) {
          msg.reply(res, this);
        } else {
          msg.reply(res);
        }
      }

      public void handle(Message<Integer> msg) {
        sendBatch(msg, msg.body(), 10);
      }
    });

    Regulator regulator=new Regulator<>();

    Observable<RxStream<Integer,JsonArray>> res=rx.<Integer,JsonArray>observeStream("countdown", 4000)
      // Add the regulator gate here
      .lift(regulator)
      // Process the stream
      .map(new Func1<RxStream<Integer, JsonArray>, JsonArray>() {
        public JsonArray call(RxStream<Integer, JsonArray> s) {
          int start = s.value().get(0);
          if (start > 0) {
            s.next(start - 10);
          }
          return s.value();
        }
      })
      .map(new Func1<JsonArray, Buffer>() {
        public Buffer call(JsonArray data) {
          return new Buffer(data.encode());
        }
      });

    // Create a WriteStream that can only handle 5 writes per second
    RatedWriteStream out=new RatedWriteStream(vertx,5);

    assertCountThenComplete(regulator.stream(res,out),401);
  }

}
TOP

Related Classes of io.vertx.rxcore.test.integration.java.EventBusIntegrationTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.