Skip to content

Commit

Permalink
[core] orderbook sync
Browse files Browse the repository at this point in the history
  • Loading branch information
rizer1980 committed Jun 9, 2024
1 parent 8dad542 commit a224830
Show file tree
Hide file tree
Showing 5 changed files with 570 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package org.knowm.xchange.dto.marketdata;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.locks.StampedLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Getter;
import org.knowm.xchange.dto.Order.OrderType;
import org.knowm.xchange.dto.trade.LimitOrder;
import org.knowm.xchange.instrument.Instrument;
Expand All @@ -18,14 +21,17 @@
public final class OrderBook implements Serializable {

private static final long serialVersionUID = -7788306758114464314L;

@JsonIgnore public final StampedLock lock = new StampedLock();
/** the asks */
@Getter
private final List<LimitOrder> asks;

/** the bids */
@Getter
private final List<LimitOrder> bids;

/** the timestamp of the orderbook according to the exchange's server, null if not provided */
@Getter
private Date timeStamp;

/**
Expand Down Expand Up @@ -113,21 +119,6 @@ private static LimitOrder withAmount(LimitOrder limitOrder, BigDecimal tradeable
return new LimitOrder(type, tradeableAmount, instrument, id, date, limit);
}

public Date getTimeStamp() {

return timeStamp;
}

public List<LimitOrder> getAsks() {

return asks;
}

public List<LimitOrder> getBids() {

return bids;
}

public List<LimitOrder> getOrders(OrderType type) {

return type == OrderType.ASK ? asks : bids;
Expand All @@ -141,23 +132,35 @@ public List<LimitOrder> getOrders(OrderType type) {
* @param limitOrder the new LimitOrder
*/
public void update(LimitOrder limitOrder) {

update(getOrders(limitOrder.getType()), limitOrder);
updateDate(limitOrder.getTimestamp());
}

// Replace the amount for limitOrder's price in the provided list.
private void update(List<LimitOrder> asks, LimitOrder limitOrder) {

int idx = Collections.binarySearch(asks, limitOrder);
if (idx >= 0) {
asks.remove(idx);
} else {
idx = -idx - 1;
}

if (limitOrder.getRemainingAmount().compareTo(BigDecimal.ZERO) != 0) {
asks.add(idx, limitOrder);
private void update(List<LimitOrder> limitOrders, LimitOrder limitOrder) {
long stamp = lock.readLock();
int idx = Collections.binarySearch(limitOrders, limitOrder);
try {
while (true) {
long writeStamp = lock.tryConvertToWriteLock(stamp);
if (writeStamp != 0L) {
stamp = writeStamp;
if (idx >= 0) limitOrders.remove(idx);
else idx = -idx - 1;
if (limitOrder.getRemainingAmount().compareTo(BigDecimal.ZERO) != 0)
limitOrders.add(idx, limitOrder);
updateDate(limitOrder.getTimestamp());
break;
} else {
lock.unlockRead(stamp);
stamp = lock.writeLock();
// here wee need to recheck idx, because it is possible that orderBook changed between
// unlockRead and lockWrite
if (recheckIdx(limitOrders, limitOrder, idx))
idx = Collections.binarySearch(limitOrders, limitOrder);
}
}
} finally {
lock.unlock(stamp);
}
}

Expand All @@ -169,22 +172,54 @@ private void update(List<LimitOrder> asks, LimitOrder limitOrder) {
* @param orderBookUpdate the new OrderBookUpdate
*/
public void update(OrderBookUpdate orderBookUpdate) {

long stamp = lock.readLock();
LimitOrder limitOrder = orderBookUpdate.getLimitOrder();
List<LimitOrder> limitOrders = getOrders(limitOrder.getType());
int idx = Collections.binarySearch(limitOrders, limitOrder);
if (idx >= 0) {
limitOrders.remove(idx);
} else {
idx = -idx - 1;
try {
while (true) {
long writeStamp = lock.tryConvertToWriteLock(stamp);
if (writeStamp != 0L) {
stamp = writeStamp;
if (idx >= 0) limitOrders.remove(idx);
else idx = -idx - 1;
if (orderBookUpdate.getTotalVolume().compareTo(BigDecimal.ZERO) != 0) {
LimitOrder updatedOrder = withAmount(limitOrder, orderBookUpdate.getTotalVolume());
limitOrders.add(idx, updatedOrder);
}
updateDate(limitOrder.getTimestamp());
break;
} else {
lock.unlockRead(stamp);
stamp = lock.writeLock();
// here wee need to recheck idx, because it is possible that orderBook changed between
// unlockRead and lockWrite
if (recheckIdx(limitOrders, limitOrder, idx))
idx = Collections.binarySearch(limitOrders, limitOrder);
}
}
} finally {
lock.unlock(stamp);
}
}

if (orderBookUpdate.getTotalVolume().compareTo(BigDecimal.ZERO) != 0) {
LimitOrder updatedOrder = withAmount(limitOrder, orderBookUpdate.getTotalVolume());
limitOrders.add(idx, updatedOrder);
private boolean recheckIdx(List<LimitOrder> limitOrders, LimitOrder limitOrder, int idx) {
if (idx >= 0) {
// if positive, null check or compare
return limitOrders.get(idx) == null || limitOrders.get(idx).compareTo(limitOrder) != 0;
} else {
// on end of array, null check or one check
if (limitOrders.size() == -idx - 1) {
return limitOrders.get(-idx - 2) == null
|| limitOrders.get(-idx - 2).compareTo(limitOrder) >= 0;
} else
// if negative, check that of limitOrders.get(reversed idx) limitOrders.get(reversed idx-1)
// and is lower and bigger than limitOrder
return (limitOrders.get(-idx - 1) == null
|| limitOrders.get(-idx - 1).compareTo(limitOrder) <= 0)
&& (limitOrders.get(-idx - 2) == null
|| limitOrders.get(-idx - 2).compareTo(limitOrder) >= 0);
}

updateDate(limitOrder.getTimestamp());
}

// Replace timeStamp if the provided date is non-null and in the future
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package org.knowm.xchange.dto.marketdata;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.var;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.Order.OrderType;
import org.knowm.xchange.dto.trade.LimitOrder;
import org.knowm.xchange.instrument.Instrument;

public class ConcurrencyTest {

static Instrument inst = new CurrencyPair("BTC/USDT");

public static void main(String[] args) throws InterruptedException, ExecutionException {
OrderBook orderBook1 = new OrderBook(new Date(), initOrderBookAsks(), initOrderBookBids(),
true);
OrderBook orderBook2 = new OrderBook(new Date(), initOrderBookAsks(), initOrderBookBids(),
true);
OrderBookOld orderBookOld = new OrderBookOld(new Date(), initOrderBookAsks(),
initOrderBookBids(), true);
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(50);
newWay(orderBook1, executor);
executor.awaitTermination(100L, TimeUnit.SECONDS);
executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(50);
oldWay(orderBook2, executor);
executor.awaitTermination(100L, TimeUnit.SECONDS);
executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(50);
oldOB(orderBookOld, executor);
executor.awaitTermination(100L, TimeUnit.SECONDS);
if (orderBook1.getAsks().get(0).getOriginalAmount()
.equals(orderBook2.getAsks().get(0).getOriginalAmount())
&& orderBook1.getAsks().get(0).getOriginalAmount()
.equals(orderBookOld.getAsks().get(0).getOriginalAmount())) {
System.out.println("OrderBooks equals");
}
}


private static List<LimitOrder> initOrderBookAsks() {
List<LimitOrder> asks = new ArrayList<>();
asks.add(new LimitOrder(OrderType.ASK, new BigDecimal(1), inst, "", new Date(),
new BigDecimal(103)));
asks.add(new LimitOrder(OrderType.ASK, new BigDecimal(1), inst, "", new Date(),
new BigDecimal(102)));
asks.add(new LimitOrder(OrderType.ASK, new BigDecimal(1), inst, "", new Date(),
new BigDecimal(101)));
return asks;
}

private static List<LimitOrder> initOrderBookBids() {
List<LimitOrder> bids = new ArrayList<>();
bids.add(
new LimitOrder(OrderType.BID, new BigDecimal(1), inst, "", new Date(), new BigDecimal(99)));
bids.add(
new LimitOrder(OrderType.BID, new BigDecimal(1), inst, "", new Date(), new BigDecimal(98)));
bids.add(
new LimitOrder(OrderType.BID, new BigDecimal(1), inst, "", new Date(), new BigDecimal(97)));
return bids;
}

private static void newWay(OrderBook orderBook, ThreadPoolExecutor executor)
throws InterruptedException {
System.out.printf("OrderBook before %s%n", orderBook);
for (int i = 0; i < 50; i++) {
executor.execute(() -> updateOrderBook1(orderBook, false));
executor.execute(() -> readOrderBook(orderBook, false));
executor.execute(() -> updateOrderBook2(orderBook, false));
}
executor.shutdown();
executor.awaitTermination(100L, TimeUnit.SECONDS);
System.out.printf("OrderBook after %s%n", orderBook);
}

private static void oldWay(OrderBook orderBook, ThreadPoolExecutor executor)
throws InterruptedException {
System.out.printf("OrderBook before %s%n", orderBook);
for (int i = 0; i < 50; i++) {
executor.execute(() -> updateOrderBook1(orderBook, true));
executor.execute(() -> readOrderBook(orderBook, true));
executor.execute(() -> updateOrderBook2(orderBook, true));
}
executor.shutdown();
executor.awaitTermination(100L, TimeUnit.SECONDS);
System.out.printf("OrderBook after %s%n", orderBook);
}

private static void oldOB(OrderBookOld orderBookOld, ThreadPoolExecutor executor)
throws InterruptedException {
System.out.printf("OrderBookOld before %s%n", orderBookOld);
for (int i = 0; i < 50; i++) {
executor.execute(() -> updateOrderBookOld1(orderBookOld));
executor.execute(() -> readOrderBookOld(orderBookOld));
executor.execute(() -> updateOrderBookOld2(orderBookOld));
}
executor.shutdown();
executor.awaitTermination(100L, TimeUnit.SECONDS);
System.out.printf("OrderBookOld after %s%n", orderBookOld);
}

public static void updateOrderBook1(OrderBook orderBook, boolean oldWay) {
Random rand = new Random(123);
for (int i = 0; i < 100000; i++) {
OrderBookUpdate orderBookUpdateAsk = new OrderBookUpdate(OrderType.ASK, new BigDecimal(0),
inst, new BigDecimal(101), new Date(), new BigDecimal(rand.nextInt()));
OrderBookUpdate orderBookUpdateBid = new OrderBookUpdate(OrderType.BID, new BigDecimal(0),
inst, new BigDecimal(99), new Date(), new BigDecimal(rand.nextInt()));
if (oldWay) {
synchronized (orderBook) {
orderBook.update(orderBookUpdateAsk);
orderBook.update(orderBookUpdateBid);
}
} else {
orderBook.update(orderBookUpdateAsk);
orderBook.update(orderBookUpdateBid);
}
}
}

public static void updateOrderBookOld1(OrderBookOld orderBookOld) {
Random rand = new Random(123);
for (int i = 0; i < 100000; i++) {
OrderBookUpdate orderBookUpdateAsk = new OrderBookUpdate(OrderType.ASK, new BigDecimal(0),
inst, new BigDecimal(101), new Date(), new BigDecimal(rand.nextInt()));
OrderBookUpdate orderBookUpdateBid = new OrderBookUpdate(OrderType.BID, new BigDecimal(0),
inst, new BigDecimal(99), new Date(), new BigDecimal(rand.nextInt()));
synchronized (orderBookOld) {
orderBookOld.update(orderBookUpdateAsk);
orderBookOld.update(orderBookUpdateBid);
}
}
}

private static void updateOrderBook2(OrderBook orderBook, boolean oldWay) {
Random rand = new Random(123);
for (int i = 0; i < 100000; i++) {
LimitOrder bookUpdateAsk = new LimitOrder(OrderType.ASK, new BigDecimal(rand.nextInt()),
inst, "", new Date(), new BigDecimal(101));
LimitOrder bookUpdateBid = new LimitOrder(OrderType.BID, new BigDecimal(rand.nextInt()),
inst, "", new Date(), new BigDecimal(99));
if (oldWay) {
synchronized (orderBook) {
orderBook.update(bookUpdateAsk);
orderBook.update(bookUpdateBid);
}
} else {
orderBook.update(bookUpdateAsk);
orderBook.update(bookUpdateBid);
}
}
}

private static void updateOrderBookOld2(OrderBookOld orderBookOld) {
Random rand = new Random(123);
for (int i = 0; i < 100000; i++) {
LimitOrder bookUpdateAsk = new LimitOrder(OrderType.ASK, new BigDecimal(rand.nextInt()),
inst, "", new Date(), new BigDecimal(101));
LimitOrder bookUpdateBid = new LimitOrder(OrderType.BID, new BigDecimal(rand.nextInt()),
inst, "", new Date(), new BigDecimal(99));
synchronized (orderBookOld) {
orderBookOld.update(bookUpdateAsk);
orderBookOld.update(bookUpdateBid);
}
}
}

private static void readOrderBook(OrderBook orderBook, boolean oldWay) {
for (int i = 0; i < 1200000; i++) {
int temp = 0;
if (oldWay) {
synchronized (orderBook) {
for (LimitOrder ask : orderBook.getAsks()) {
temp += ask.hashCode();
}
for (LimitOrder bid : orderBook.getBids()) {
temp += bid.hashCode();
}
}
} else {
var stamp = orderBook.lock.readLock();
for (LimitOrder ask : orderBook.getAsks()) {
temp += ask.hashCode();
}
for (LimitOrder bid : orderBook.getBids()) {
temp += bid.hashCode();
}
orderBook.lock.unlockRead(stamp);
}
}
}

private static void readOrderBookOld(OrderBookOld orderBookOld) {
for (int i = 0; i < 1200000; i++) {
int temp = 0;
synchronized (orderBookOld) {
for (LimitOrder ask : orderBookOld.getAsks()) {
temp += ask.hashCode();
}
for (LimitOrder bid : orderBookOld.getBids()) {
temp += bid.hashCode();
}
}
}
}

}

Loading

0 comments on commit a224830

Please sign in to comment.