import {
  HubConnectionBuilder,
  LogLevel,
  HubConnectionState,
  HttpTransportType,
} from "@microsoft/signalr";
import { Helpers } from "../helpers/helpers";
import * as _ from "underscore";

class DynamicDataRepositoryKafkaProxy {
  constructor(onPrice, onInactivePrice, onPriceStatus, onFeedStatus, onReconnect) {
    this.onPrice = onPrice;
    this.onInactivePrice = onInactivePrice;
    this.onPriceStatus = onPriceStatus;
    this.onFeedStatus = onFeedStatus;
    this.onReconnect = onReconnect;
    this.DYNAMIC_DATA_REPOSITORY_API_URL =
      Helpers.currentEnv().DYNAMIC_DATA_REPOSITORY_API_URL;
    console.debug(
      "Dynamic data repository: ",
      Helpers.currentEnvName(),
      this.DYNAMIC_DATA_REPOSITORY_API_URL
    );

    this.hubConnection = new HubConnectionBuilder()
      .withUrl(this.DYNAMIC_DATA_REPOSITORY_API_URL + "wsKafka", {
        skipNegotiation: true,
        transport: HttpTransportType.WebSockets,
      })
      .configureLogging(LogLevel.Debug)
      .withAutomaticReconnect()
      .build();

    this.hubConnection.keepAliveIntervalInMilliseconds = 60000;
    this.hubConnection.serverTimeoutInMilliseconds = 90000;

    this.hubConnection.onreconnected((connectionId) => {
      var markets = [];
      if (this.onReconnect) markets = this.onReconnect();
      var selected = markets.filter((x) => x);
      selected = _.uniq(selected, (x) => x);
      if (selected && selected.length > 0) {
        console.debug("Kafka Reconnected...", selected);
        this.hubConnection
          .invoke("refresh", selected, 8)
          .catch((err) => console.error("Kafka onreconnected: ", err));
      }
    });

    this.hubConnection.onclose((connectionId) => {
      console.debug("Kafka connection closed...");
    });

    this.hubConnection.on("price", (message) => {
      if (this.onPrice) this.onPrice(message[0]);
    });

    this.hubConnection.on("inactivePrice", (message) => {
      if (this.onInactivePrice) this.onInactivePrice(message[0]);
    });

    this.hubConnection.on("priceStatus", (message) => {
      if (this.onPriceStatus) this.onPriceStatus(message[0]);
    });

    this.hubConnection.on("feedStatus", (message) => {
      if (this.onFeedStatus) this.onFeedStatus(message[0]);
    });
  }

  async connect() {
    if (this.hubConnection.state === HubConnectionState.Disconnected) {
      await this.hubConnection
        .start()
        .then(() => {
        })
        .catch((error) => {
          console.assert(
            this.hubConnection.state === HubConnectionState.Disconnected
          );
          console.error("Kafka connect: ", error);
          setTimeout(() => this.connect(), 2000);
        });
    }
  }

  async subscribeAndOverride(names) {
    names = _.uniq(names, (x) => x);
    if (this.hubConnection.state === HubConnectionState.Connected) {
      await this.hubConnection
        .invoke("subscribeAndOverride", names, 15)
        .catch((err) => console.error("Kafka subscribe: ", err));
    }
  }

  async unsubscribe() {
    if (this.hubConnection.state === HubConnectionState.Connected)
      await this.hubConnection
        .invoke("unsubscribe")
        .catch((err) => console.error("Kafka unsubscribe: ", err));
  }

  async stop() {
    await this.hubConnection.stop().catch((err) => console.error("Kafka stop: ", err));
  }
}

export default DynamicDataRepositoryKafkaProxy;
