import {eventChannel} from 'redux-saga';
import {takeLatest} from 'redux-saga/effects';
import {put, call, take} from 'redux-saga/effects';
import { receiveMessage, fetchAllMessages } from "../actions";
import {
  ADD_INCOMING_MESSAGE_FOR_NUMBER,
  ADD_MESSAGE_FOR_NUMBER,
  CHANGE_MESSAGE_FOR_NUMBER,
  GLOBAL_SCROLL_ON,
  INITIALIZE_WEB_SOCKETS_CHANNEL,
  MARK_CONVERSATION_READ_ON_PANEL
} from "../../constants/ActionTypes";
import {select} from 'redux-saga/effects';


const wsUrl = process.env.REACT_APP_WS_URL;
const audio = new Audio('/notification.mp3');

function initWebsocket(jwt) {
  const connectWs = (emitter) => {
    const socket = new WebSocket(`${wsUrl}?Auth=${jwt}&account=${window.sessionStorage.getItem('account')}`);
    socket.onmessage = (message) => {
      return emitter(message.data);
    };
    socket.onopen = () => {
      console.log('Socket open');
    };
    socket.onerror = (error) => {
      console.error('WebSocket error', error);
    };
    socket.onclose = (error) => {
      console.error('WebSocket close', error);
      switch (error.code){
        case 1000:	// CLOSE_NORMAL
          console.log("WebSocket: closed");
          break;
        default:	// Abnormal closure
          setTimeout(() => {
            connectWs(emitter);
          }, 2000)
          break;
      }
    };
    // unsubscribe function
    return () => {
      console.log('Socket off');
      socket.close();
    }
  };

  return eventChannel(emitter => {
    return connectWs(emitter)
  })
}

function parseResponse(response) {
  try {
    return JSON.parse(response);
  } catch (error) {
    console.error('Can\'t parse WS JSON response: ', response);
    return null;
  }
}

/*
 * show notification for either unassigned or assigned me
 * do not show for assigned to team
 */
function* checkShowNotification(data) {
  if(data.payload.isSpam) {
      return;
  }
  const allConversations = yield select((state) => state.messages);
  const matchedConversation = allConversations.find(conversation => data.payload._doc.from === conversation.clientNumber);

  if (matchedConversation && matchedConversation.assignedTo && matchedConversation.assignedTo !== window.sessionStorage.getItem('sub')) {
    return; // assigned to someone else -- no notification
  }

  yield put(receiveMessage(data));
  audio.play();
}

const isUserNumber = (userPhoneNumbers, phoneNumber) => userPhoneNumbers?.find(item => item.number === phoneNumber);

function* handleWsResponse(data) {
  if (!data) {
    return;
  }

  const userPhoneNumbers = yield select((state) => state.accounts?.phoneNumbersAll);

  switch (data.type) {
    case 'SENT_MESSAGE':
      yield put({type: ADD_MESSAGE_FOR_NUMBER, message: data.payload._doc});
      yield put({type: GLOBAL_SCROLL_ON, clientNumber: data.payload._doc.clientNumber});
      break;
    case 'INCOMING_MESSAGE':
      if (yield call(isUserNumber, userPhoneNumbers, data.payload._doc.to)) {
        yield put(fetchAllMessages(false, true, false));
        yield put({type: ADD_INCOMING_MESSAGE_FOR_NUMBER, message: data.payload._doc});
        yield put({type: GLOBAL_SCROLL_ON, clientNumber: data.payload._doc.clientNumber});
        yield checkShowNotification(data);
      }
      break;
    case 'MESSAGE_STATUS':
      if (yield call(isUserNumber, userPhoneNumbers, data.payload.from)) {
        yield put({type: CHANGE_MESSAGE_FOR_NUMBER, message: data.payload});
      }
      break;
    case 'CONVERSATION_READ':
      yield put({type: MARK_CONVERSATION_READ_ON_PANEL, conversationId: data.payload.conversationId});
      break;
    default:
      console.error('WS response.type not recognised');
  }
}


function* initializeWebSocketsChannel({jwt}) {
  const channel = yield call(initWebsocket, jwt);
  while (true) {
    const data = yield take(channel);
    yield handleWsResponse(parseResponse(data));
  }
}

export default function* wsSagas() {
  yield [
    takeLatest(INITIALIZE_WEB_SOCKETS_CHANNEL, initializeWebSocketsChannel)
  ];
}
