import { nanoid } from "@reduxjs/toolkit";
import { buffers, eventChannel, type EventChannel } from "redux-saga";
import { call, delay, put, select, take, takeEvery } from "redux-saga/effects";

import { selectCognitoUser } from "@ds/modules/auth/redux/selectors";
import { selectCurrentUserProjectId } from "@ds/modules/settings/users/redux/selectors";

import { pubSubService } from "@ds/services/pubsub";
import { UnexpectedError } from "@ds/utils/errors";
import { logger } from "@ds/utils/logger";
import { isErrorLike } from "@ds/utils/type-guards/error-guards";

import { createIoTError } from "../utils/model";
import { iotEventUpdate, iotSubscribe, iotSubscribeFailed, iotUnsubscribe } from "./actions";

const IOT_RECONNECT_TIMEOUT = 5000;

let channel: EventChannel<IoTEntity> | null = null;

const getIoTTopicPath = (entity: IoTEntity): IoTTopicPath | undefined => {
  const topic = Object.getOwnPropertySymbols(entity).find(s => String(s) === "Symbol(topic)");
  if (topic) {
    const [, , , topicPath] = (entity as IoTEntity & { [topic: symbol]: string })[topic].split("/");
    return topicPath as IoTTopicPath;
  }
};

const subscribeToChannel = (tenantId: string): EventChannel<IoTEntity | IoTError> =>
  eventChannel(emitter => {
    const topics = [`ds/events/${tenantId}/#`];
    const subscriptionId = `web:${nanoid()}`;

    pubSubService.subscribe(
      subscriptionId,
      topics,
      data => emitter(data.value as IoTEntity),
      err => {
        logger.warn(err);
        emitter(createIoTError("Failed to subscribe to IoT events"));
      },
    );

    return () => {
      pubSubService.unsubscribe();
    };
  }, buffers.sliding(5));

function* subscribe() {
  if (channel !== null) {
    yield put(iotUnsubscribe());
  }

  try {
    if (!window.navigator.onLine) {
      throw new Error("Offline");
    }

    const cognitoUser = selectCognitoUser(yield select());
    if (!cognitoUser?.tenant_id) {
      throw new Error("IoT Tenant is undefined");
    }

    const projectId = selectCurrentUserProjectId(yield select());
    if (!projectId) {
      throw new Error("Project Id is undefined");
    }

    channel = yield call(subscribeToChannel, cognitoUser.tenant_id);

    while (true) {
      if (channel) {
        const entity: IoTEntity = yield take(channel);
        const topicPath = getIoTTopicPath(entity);
        if (topicPath) {
          if ("projectId" in entity && entity.projectId !== projectId) {
            continue;
          }

          yield put(iotEventUpdate(topicPath)(entity));
        }
      }
    }
  } catch (err) {
    if (isErrorLike(err)) {
      yield put(iotSubscribeFailed(err));
    } else {
      logger.error("Subscribe to IoT", err);
      yield put(iotSubscribeFailed(UnexpectedError));
    }
  }
}

function* unsubscribe() {
  try {
    pubSubService.unsubscribe();
    channel?.close();
    channel = null;
  } catch (err) {
    logger.error("Unsubscribe to IoT", err);
  }

  yield;
}

function* resubscribe() {
  yield put(iotUnsubscribe());
  yield delay(IOT_RECONNECT_TIMEOUT);
  yield put(iotSubscribe());
}

export function* rootIoTSaga() {
  yield takeEvery(iotSubscribe, subscribe);
  yield takeEvery(iotSubscribeFailed, resubscribe);
  yield takeEvery(iotUnsubscribe, unsubscribe);
}
