import {combineEpics} from 'redux-observable';
import * as commonSelectors from 'profile/store/selectors';
import {push} from 'connected-react-router';
import {bcTypes} from 'bc/services/bcTypes';
import {routeChanged as routeChangedAction, setRoute} from 'common/store/actions';
import {getDateValue, rangeTypes} from 'common/utils/dateRangeService';
import {getFormattedDateTime} from 'common/utils/dateService';
import {getUniqueId} from 'common/utils/guid';
import {info} from 'common/utils/notifications/notificationsService';
import {makeAsyncEpic} from 'common/utils/simplifiedAsync';
import {getStreamQueryId, isEditRunning, pollingIntervals, resolutions} from 'bc/services/dataStreamService';
import {bcInfoCodes} from 'bc/services/bcErrorCodes';
import {get, isEqual} from 'lodash';
import {Observable} from 'rxjs/Observable';
import {fetchCustomerMetricsCount} from 'metrics/store/actions';
import {deleteImpactsByOrigin} from 'impact/store/actions';
import {ORIGIN_TYPES} from 'impact/services/constants';
import * as selectors from '../selectors';
import * as api from '../../services/api';
import * as actions from '../actions';
import 'rxjs/add/operator/distinctUntilChanged';
import 'rxjs/add/operator/debounceTime';
import 'rxjs/add/operator/throttleTime';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/mapTo';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/filter';
import 'rxjs/add/observable/of';

const EMPTY_ARRAY = [];

const isBcEnabledFilterCall = (action, getState) => !!commonSelectors.getOrganizationSettingsBC(getState()).enabled;

const fetchDataStreams = makeAsyncEpic(actions.fetchDataStreams, api.fetchDataStreams, isBcEnabledFilterCall);
const updateStreamName = makeAsyncEpic(actions.updateStreamName, api.updateStreamName);
const updateAgentOwner = makeAsyncEpic(actions.updateAgentOwner, api.updateAgentOwner);
const duplicateStream = makeAsyncEpic(actions.duplicateStream, api.duplicateStream);
const deletePipelineStream = makeAsyncEpic(actions.deletePipelineStream, api.deletePipelineStream);

const fetchStreamDashboards = makeAsyncEpic(actions.fetchStreamDashboards, api.fetchStreamDashboards);
const fetchStreamDashboardsV2 = makeAsyncEpic(actions.fetchStreamDashboardsV2, api.fetchStreamDashboards);
const fetchStreamAlerts = makeAsyncEpic(actions.fetchStreamAlerts, api.fetchStreamAlerts);
const fetchStreamComposites = makeAsyncEpic(actions.fetchStreamComposites, api.fetchStreamComposites);

// when there is a slow network - fetching streams and then manipulating (add/remove) stream may cause inconsistance
const refetchDataStreamsIfDuringFetchStreamsArrayWasChanged = (action$, {getState}) =>
  action$.ofType(actions.createDataStream.TYPE, actions.deleteDataStream.TYPE).flatMap(() => {
    if (selectors.getIsDataStreamsLoading(getState())) {
      return [actions.fetchDataStreams()];
    }
    return [];
  });

const refrashPipeline = (action$) =>
  action$.ofType(actions.deletePipelineStream.TYPE).flatMap(() => [actions.fetchDataStreams()]);

const deleteImpacts = (action$) =>
  action$
    .ofType(actions.deleteDataStream.TYPE)
    .flatMap(({payload}) => [deleteImpactsByOrigin({originId: payload.id, originType: ORIGIN_TYPES.stream})]);

const createDataStream = (action$, {getState}) =>
  action$
    .ofType(actions.createDataStream.TYPE)
    // eslint-disable-next-line complexity
    .switchMap((action) => {
      const owner = commonSelectors.getProfileId(getState());
      const newStream = {
        type: action.payload.type,
        dataSourceId: action.payload.dataSourceId,
        name: '',
        dimensions: [],
        metrics: [],
        basedOnTemplateId: '',
        pollingInterval: pollingIntervals.daily.key,
        owner,
      };
      switch (action.payload.type) {
        case bcTypes.google_ads.type: {
          Object.assign(newStream, {
            historicalDateRange: {constRange: rangeTypes.m1.key},
            pollingResolution: resolutions.days.key,
            delayMinutes: 60,
          });
          break;
        }
        case bcTypes.google_search.type: {
          Object.assign(newStream, {
            pollingResolution: resolutions.days.key,
            historicalDateRange: {constRange: rangeTypes.m3.key},
            delayMinutes: 1920,
          });
          break;
        }
        case bcTypes.facebook_ads.type: {
          Object.assign(newStream, {
            historicalDateRange: {constRange: rangeTypes.m1.key},
            pollingResolution: resolutions.days.key,
            timeZone: 'UTC',
            adAccountId: null,
            reportType: null,
            basedOnTemplateId: null,
            delayMinutes: 5,
          });
          break;
        }
        case bcTypes.google_analytics.type: {
          Object.assign(newStream, {
            historicalDateRange: {constRange: rangeTypes.m1.key},
            pollingResolution: resolutions.days.key,
            delayMinutes: 120,
            maxGoldenDelayMinutes: 120,
          });
          break;
        }
        case bcTypes.google_analytics_ga4.type: {
          Object.assign(newStream, {
            historicalDateRange: {constRange: rangeTypes.m1.key},
            pollingResolution: resolutions.days.key,
            delayMinutes: 90,
          });
          break;
        }
        case bcTypes.local_file.type: {
          Object.assign(newStream, {
            historicalDateRange: null,
          });
          break;
        }
        case bcTypes.s3.type: {
          Object.assign(newStream, {
            historicalDateRange: getDateValue(rangeTypes.m3.key, true),
            timeZone: 'UTC',
            maxMissingFiles: 0,
            path: '',
            fileNameSuffix: '',
            fileNamePrefix: '',
            fileNamePattern: 'yyyyMMddHH',
          });
          break;
        }
        case bcTypes.athena.type: {
          Object.assign(newStream, {
            historicalDateRange: getDateValue(rangeTypes.m3.key, true),
            inputLocation: '',
            partitionFolderFormat: '',
            inputFormat: 'Parquet',
            compression: '',
            timeZone: 'UTC',
            maxBackFillIntervals: 0,
            delayMinutes: 15,
          });
          break;
        }
        case bcTypes.google_storage.type: {
          Object.assign(newStream, {
            historicalDateRange: getDateValue(rangeTypes.m3.key, true),
            timeZone: 'UTC',
            maxMissingFiles: 0,
            path: '',
            fileNameSuffix: '',
            fileNamePrefix: '',
            fileNamePattern: 'yyyyMMddHH',
          });
          break;
        }
        case bcTypes.segment.type: {
          Object.assign(newStream, {
            delayMinutes: 5,
          });
          break;
        }
        case bcTypes.mysql.type:
        case bcTypes.mariadb.type:
        case bcTypes.redshift.type:
        case bcTypes.snowflake.type:
        case bcTypes.oracle.type:
        case bcTypes.mssql.type:
        case bcTypes.psql.type:
        case bcTypes.pinot.type:
        case bcTypes.databricks.type:
        case bcTypes.teradata.type:
        case bcTypes.athena_sql.type:
        case bcTypes.timestream.type: {
          Object.assign(newStream, {
            historicalDateRange: {constRange: rangeTypes.m1.key},
            timeZone: 'UTC',
            delayMinutes: 5,
            maxBackFillIntervals: 3,
            timestampType: 'timestamp',
            missingDimPolicy: {
              action: 'ignore',
              fill: 'unknown',
            },
          });
          break;
        }
        case bcTypes.adobe.type: {
          Object.assign(newStream, {
            historicalDateRange: {constRange: rangeTypes.m1.key},
            delayMinutes: 5,
            pollingResolution: resolutions.days.key,
          });
          break;
        }
        case bcTypes.bigquery.type: {
          Object.assign(newStream, {
            projectId: null,
            defaultDataset: null,
            historicalDateRange: {constRange: rangeTypes.d3.key},
            delayMinutes: 5,
            maxBackFillIntervals: 0,
            useLegacySQL: true,
            timeZone: 'UTC',
          });
          break;
        }
        case bcTypes.mparticle.type: {
          Object.assign(newStream, {
            eventType: null,
          });
          break;
        }
        case bcTypes.kinesis.type:
        case bcTypes.eventhubs.type: {
          Object.assign(newStream, {
            recordType: '',
            path: '',
            value: '',
            pollingInterval: pollingIntervals.m1.key,
            delayMinutes: 5,
          });
          break;
        }
        case bcTypes.salesforce.type: {
          Object.assign(newStream, {
            historicalDateRange: null,
            timeZone: 'UTC',
            delayMinutes: -1,
          });
          break;
        }
        case bcTypes.aws_cur.type: {
          Object.assign(newStream, {
            timeZone: 'UTC',
          });
          break;
        }
        case bcTypes.coralogix.type: {
          Object.assign(newStream, {
            metricName: null,
            filter: [],
            historicalDateRange: {constRange: rangeTypes.d1.key},
            delayMinutes: 5,
            aggrInterval: 5,
          });
          break;
        }
        case bcTypes.newrelic.type: {
          Object.assign(newStream, {
            accountId: null,
            eventType: null,
            queryType: 'TABLE',
            historicalDateRange: getDateValue(rangeTypes.m3.key, true),
            pollingInterval: pollingIntervals.m1.key,
            delayMinutes: 5,
          });
          break;
        }
        case bcTypes.datadog.type: {
          Object.assign(newStream, {
            metricName: null,
            filter: [],
            historicalDateRange: {constRange: rangeTypes.d1.key},
            pollingInterval: pollingIntervals.m5.key,
            aggregation: 'avg',
            rollupAggregation: 'avg',
            resolutionMinutes: 5,
          });
          break;
        }
        case bcTypes.mxpnl.type: {
          Object.assign(newStream, {
            historicalDateRange: getDateValue(rangeTypes.m1.key, true),
            timeZone: 'UTC',
            maxBackFillIntervals: 0,
            delayMinutes: 5,
          });
          break;
        }
        case bcTypes.sumologic.type: {
          Object.assign(newStream, {
            historicalDateRange: getDateValue(rangeTypes.m1.key, true),
            timeZone: 'UTC',
            delayMinutes: 15,
            notificationDelay: '1h',
            missingDimPolicy: {
              action: 'ignore',
              fill: 'unknown',
            },
          });
          break;
        }
        default:
          break;
      }
      return api.createDataStream({payload: newStream});
    })
    .map((payload) => actions.createDataStream.success(payload))
    .catch((error) => Observable.of(actions.createDataStream.failure(error)));

const saveDataStreamSuccess = (action$) =>
  action$
    .ofType(actions.createDataStream.success.TYPE, actions.duplicateStream.success.TYPE)
    .flatMap((action) => [push(`/bc/data-streams/${action.payload.id}`)]);

const deleteDataStream = makeAsyncEpic(actions.deleteDataStream, api.deleteDataStream);

const createEventStream = (action$) =>
  action$
    .ofType(actions.createEventStream.TYPE)
    .flatMap(({payload}) => [push(`/data-collectors/event-stream/edit?sourceId=${payload.sourceId}`)]);

const editEventStream = (action$) =>
  action$
    .ofType(actions.editEventStream.TYPE)
    .flatMap(({payload}) => [push(`/data-collectors/event-stream/edit?streamId=${payload.streamId}`)]);

const previewEventStream = (action$) =>
  action$
    .ofType(actions.previewEventStream.TYPE)
    .flatMap(({payload}) => [push(`/data-collectors/event-stream/preview?streamId=${payload.streamId}`)]);

const editDataStream = (action$) =>
  action$.ofType(actions.editDataStream.TYPE).flatMap(({payload}) => [push(`/bc/data-streams/${payload}`)]);

const editLiveStream = (action$) =>
  action$.ofType(actions.editLiveStream.TYPE).flatMap(() => [push('/bc/data-streams/EditRunning')]);

const previewDataStream = (action$) =>
  action$
    .ofType(actions.previewDataStream.TYPE)
    .flatMap((action) => [push(`/bc/data-streams/${action.payload}/preview`)]);

const updateDataStream = (action$) =>
  action$.ofType(actions.updateDataStream.TYPE).switchMap((action) => {
    if (isEditRunning(action.payload.id)) {
      return [actions.updateDataStream.success(action.payload, action.meta)];
    }
    return api
      .updateDataStream(action)
      .map((payload) => actions.updateDataStream.success(payload, action.meta))
      .catch((error) => Observable.of(actions.updateDataStream.failure(error)));
  });

const updateDataStreamScheduling = makeAsyncEpic(actions.updateDataStreamScheduling, api.updateDataStreamScheduling);

const autoSaveDataStream1 = (action$) =>
  action$
    .ofType(
      actions.setSelectedStreamKeyVal.TYPE,
      actions.setStreamSchemaColumnKeyVal.TYPE,
      actions.setSelectedStreamGaAccount.TYPE,
      actions.setSelectedStreamGaProperty.TYPE,
      actions.setSelectedStreamGaView.TYPE,
      actions.setSelectedStreamGaSegment.TYPE,
      // up to 6 actions only
    )
    .flatMap(() => [actions.autoSaveDataStreamDebounced()]);

const autoSaveDataStream2 = (action$) =>
  action$
    .ofType(
      actions.setFileStreamDiametricsTimeDefinition.TYPE,
      actions.setFileStreamClearAllDiametrics.TYPE,
      actions.setSqlStreamClearAllDiametrics.TYPE,
      actions.setGoogleStorageStreamKeyVal.TYPE,
      actions.setGoogleStorageStreamUiKeyVal.TYPE,
      actions.setSqlStreamSchemaName.TYPE,
      // up to 6 actions only
    )
    .flatMap(() => [actions.autoSaveDataStreamDebounced()]);

const autoSaveDataStream3 = (action$) =>
  action$
    .ofType(
      actions.setAdobeStreamUiBookmarkId.TYPE,
      actions.setBigQueryStreamDataSet.TYPE,
      actions.setBigQueryStreamProjectId.TYPE,
      actions.setBigQueryStreamClearAllDiametrics.TYPE,
      actions.setMParticleStreamEventType.TYPE,
      actions.setMParticleStreamClearAllDiametrics.TYPE,
      // up to 6 actions only
    )
    .flatMap(() => [actions.autoSaveDataStreamDebounced()]);

const autoSaveDataStream4 = (action$) =>
  action$
    .ofType(
      actions.setKinesisStreamUiKeyVal.TYPE,
      actions.setSalesforceStreamKeyVal.TYPE,
      actions.setKinesisDiametricsClearAll.TYPE,
      actions.setKinesisDiametricsTimeDefinition.TYPE,
      actions.setS3StreamKeyVal.TYPE,
      actions.setS3StreamUiKeyVal.TYPE,
      // up to 6 actions only
    )
    .flatMap(() => [actions.autoSaveDataStreamDebounced()]);

const autoSaveDataStream5 = (action$) =>
  action$
    .ofType(
      actions.setSelectedGoogleAdsTemplate.TYPE,
      actions.setSelectedStreamUiKeyVal.TYPE,
      actions.setParquetStreamDiametricsChange.TYPE,
      actions.removeParquetStreamDiametrics.TYPE,
      actions.setParquetStreamClearAllDiametrics.TYPE,
      actions.setParquetStreamDiametricsTimeDefinition.TYPE,
      // up to 6 actions only
    )
    .flatMap(() => [actions.autoSaveDataStreamDebounced()]);

const autoSaveDataStream6 = (action$) =>
  action$
    .ofType(
      actions.setSalesforceStreamClearAllDiametrics.TYPE,
      actions.setSqlTableData.TYPE,
      actions.setS3StreamUiKeyVal.TYPE,
      actions.setBigQueryStreamQuery.TYPE,
      actions.setSalesforceObjectData.TYPE,
      actions.setSelectedFacebookAdsTemplate.TYPE,
      // up to 6 actions only
    )
    .flatMap(() => [actions.autoSaveDataStreamDebounced()]);

const autoSaveDataStream7 = (action$) =>
  action$
    .ofType(
      actions.setAwsTemplates.TYPE,
      actions.setAwsCurManifest.TYPE,
      actions.setAwsCurReports.TYPE,
      actions.applyAwsCurTemplate.TYPE,
      actions.setSegmentMessageTypes.TYPE,
      actions.setSegmentStreamClearAllDiametrics.TYPE,
      // up to 6 actions only
    )
    .flatMap(() => [actions.autoSaveDataStreamDebounced()]);

const autoSaveDataStream8 = (action$) =>
  action$
    .ofType(
      actions.setGoogleAuctionsDiametricsStreamAnalysisSchema.TYPE,
      actions.removeAuctionStreamDiametrics.TYPE,
      actions.setAuctionStreamDiametricsChange.TYPE,
      actions.setAuctionStreamClearAllDiametrics.TYPE,
      actions.setIsStreamEditEnabled.TYPE,
      // up to 6 actions only
    )
    .flatMap(() => [actions.autoSaveDataStreamDebounced()]);

const autoSaveDataStream9 = (action$) =>
  action$
    .ofType(
      actions.applyCoralogixStreamFilters.TYPE,
      actions.setCoralogixDiametricsChange.TYPE,
      actions.setCoralogixLabels.TYPE,
      actions.setCoralogixMeasures.TYPE,
      actions.setCoralogixDiametricsClearAll.TYPE,
      actions.removeCoralogixDiametrics.TYPE,
      // up to 6 actions only
    )
    .flatMap(() => [actions.autoSaveDataStreamDebounced()]);

const autoSaveDataStream10 = (action$) =>
  action$
    .ofType(
      actions.setNewrelicStreamClearAllDiametrics.TYPE,
      actions.removeNewrelicStreamDiametrics.TYPE,
      actions.setNewrelicStreamDiametricsChange.TYPE,
      actions.setDatadogDescribeMetric.TYPE,
      actions.setMixpanelDiametricsStreamAnalysisSchema.TYPE,
      // up to 6 actions only
    )
    .flatMap(() => [actions.autoSaveDataStreamDebounced()]);

const autoSaveDataStream11 = (action$) =>
  action$
    .ofType(
      actions.setSelectedStreamGA4Account.TYPE,
      actions.setSelectedStreamGA4Property.TYPE,
      actions.setKinesisIsExternalTimeUsed.TYPE,
      // up to 6 actions only
    )
    .flatMap(() => [actions.autoSaveDataStreamDebounced()]);

const autoSaveDataStreamDebounced = (action$, {getState}) =>
  action$
    .ofType(actions.autoSaveDataStreamDebounced.TYPE)
    .switchMap(() => [selectors.getSelectedDataStream(getState())])
    .distinctUntilChanged(isEqual)
    .debounceTime(1500)
    .flatMap((stream) => [actions.updateDataStream(stream)]);

const setSelectedEditor = (action$) =>
  action$
    .ofType(actions.setSelectedEditor.TYPE)
    .throttleTime(100)
    .flatMap((action) => [actions.setSelectedEditorBounced(action.payload)]);

const openSchedulerEditor = (action$) =>
  action$
    .ofType(actions.openSchedulerEditor.TYPE)
    .throttleTime(100)
    .flatMap((action) => [actions.openSchedulerEditorBounced(action.payload)]);

const startDataStream = (action$, {getState}) =>
  action$.ofType(actions.startDataStream.TYPE).switchMap((action) => {
    if (isEditRunning(action.payload.id)) {
      const stream = selectors.getSelectedDataStream(getState());
      const newAction = {...action, payload: {...action.payload, ...stream, id: stream.uiState.id}};
      return api
        .startLiveDataStream(newAction)
        .map((payload) => actions.startDataStream.success(payload))
        .catch((error) => Observable.of(actions.startDataStream.failure(error)));
    }
    return api
      .startDataStream(action)
      .map((payload) => actions.startDataStream.success(payload))
      .catch((error) => Observable.of(actions.startDataStream.failure(error)));
  });

const pauseResumeDataStream = makeAsyncEpic(actions.pauseResumeDataStream, api.startDataStream);
const dataStreamStarted = (action$) =>
  action$.ofType(actions.startDataStream.success.TYPE).flatMap((action) => {
    const infoMessage = action.payload.type === bcTypes.local_file.type ? bcInfoCodes[11001] : bcInfoCodes[11002];

    return [info(infoMessage), push('/bc/data-manager')];
  });

const getStreamPreviewData = (action$, {getState}) =>
  action$.ofType(actions.getStreamPreviewData.TYPE).flatMap(() => {
    const stream = selectors.getSelectedDataStream(getState());
    if (isEditRunning(stream.id)) {
      return [
        actions.fetchLiveDataStreamPreview(stream),
        fetchCustomerMetricsCount(),
        actions.fetchLiveDataStreamStats(stream),
      ];
    }

    return [
      actions.fetchDataStreamPreview(selectors.getSelectedDataStream(getState())),
      fetchCustomerMetricsCount(),
      actions.fetchDataStreamStats(selectors.getSelectedDataStream(getState())),
    ];
  });

const fetchLiveDataStreamPreview = (action$, {getState}) =>
  action$.ofType(actions.fetchLiveDataStreamPreview.TYPE).switchMap((action) => {
    const tempStream = {...action, payload: {...action.payload, id: action.payload.uiState.id}};
    return api
      .fetchLiveDataStreamPreview(tempStream)
      .flatMap((payload) => {
        const userTimeZoneName = commonSelectors.getTimeZoneName(getState());
        const rows = payload.rows.map((p) => [getFormattedDateTime(p.time, userTimeZoneName), ...p.row]);
        const columns = [
          {
            sourceColumn: 'timeStamp',
            name: 'Date',
            type: 'date',
          },
          ...payload.schema.columns,
        ];
        return [
          actions.fetchDataStreamPreview.success({
            rows,
            columns,
          }),
        ];
      })
      .catch((error) => Observable.of(actions.fetchDataStreamPreview.failure(error)));
  });

const fetchLiveDataStreamStats = (action$) =>
  action$.ofType(actions.fetchLiveDataStreamStats.TYPE).switchMap((action) => {
    const tempStream = {...action, payload: {...action.payload, id: action.payload.uiState.id}};
    return api
      .fetchLiveDataStreamStats(tempStream)
      .flatMap((payload) => [actions.fetchDataStreamStats.success(payload)])
      .catch((error) => Observable.of(actions.fetchDataStreamStats.failure(error)));
  });

const fetchDataStreamPreview = (action$, {getState}) =>
  action$.ofType(actions.fetchDataStreamPreview.TYPE).switchMap((action) =>
    api
      .fetchDataStreamPreview(action)
      .flatMap((payload) => {
        const userTimeZoneName = commonSelectors.getTimeZoneName(getState());
        const rows = payload.rows.map((p) => [getFormattedDateTime(p.time, userTimeZoneName), ...p.row]);
        const columns = [
          {
            sourceColumn: 'timeStamp',
            name: 'Date',
            type: 'date',
          },
          ...payload.schema.columns,
        ];
        return [
          actions.fetchDataStreamPreview.success({
            rows,
            columns,
          }),
        ];
      })
      .catch((error) => Observable.of(actions.fetchDataStreamPreview.failure(error))),
  );

const fetchDashAndAlertTitles = makeAsyncEpic(actions.fetchDashAndAlertTitles, api.fetchDashAndAlertTitles);

const fetchDataStreamStats = makeAsyncEpic(actions.fetchDataStreamStats, api.fetchDataStreamStats);
const fetchDataStreamCardinality = makeAsyncEpic(actions.fetchDataStreamCardinality, api.fetchDataStreamCardinality);
const fetchStreamHistoryLogs = makeAsyncEpic(actions.fetchStreamHistoryLogs, api.fetchStreamHistoryLogs);
const fetchStreamLastRun = makeAsyncEpic(actions.fetchStreamLastRun, api.fetchStreamLastRun);
const fetchTransformFunctions = makeAsyncEpic(
  actions.fetchTransformFunctions,
  api.fetchTransformFunctions,
  (action, getState) => !selectors.getTransformFunctionsItems(getState()).length,
);

// gets sources ids used by items recursively
const getSourceIdsArr = (sourcesIdArr, item) => {
  if (item.sourceColumn !== undefined) {
    if (!sourcesIdArr.includes(item.sourceColumn)) {
      sourcesIdArr.push(item.sourceColumn);
    }
  } else if (item.transform && item.transform.input) {
    item.transform.input.forEach((a) => {
      getSourceIdsArr(sourcesIdArr, a);
    });
  }
  return sourcesIdArr;
};

const setGaDataStreamSchema = (action$, {getState}) =>
  action$.ofType(actions.setSelectedStreamMetricsAndDimensions.TYPE).flatMap((action) => {
    const getEmptyArray = () => (action.payload.metrics.length || action.payload.dimensions.length ? [] : EMPTY_ARRAY);
    const stream = selectors.getSelectedDataStream(getState());
    const res = {
      schema: {
        columns: getEmptyArray(),
        sourceColumns: getEmptyArray(),
      },
    };
    const metricsMeta = selectors.getGoogleAnalyticsMetaMetrics(getState());
    action.payload.metrics.forEach((item) => {
      const metric = metricsMeta.find((m) => m.id === item);
      res.schema.columns.push({
        id: getUniqueId(),
        sourceColumn: metric.id,
        name: metric.uiName,
        targetType: metric.aggregation,
        type: 'metric',
      });
      res.schema.sourceColumns.push({
        id: metric.id,
        name: metric.id,
      });
    });

    const dimensionsMeta = selectors.getGoogleAnalyticsMetaDimensions(getState());
    action.payload.dimensions.forEach((item) => {
      if (item === 'ga:segment') {
        res.schema.columns.push({
          id: getUniqueId(),
          sourceColumn: 'ga:segment',
          name: 'Segment',
          type: 'dimension',
          hidden: false,
        });
        res.schema.sourceColumns.push({
          id: 'ga:segment',
          name: 'ga:segment',
        });
      } else {
        const dimension = dimensionsMeta.find((m) => m.id === item);
        res.schema.columns.push({
          id: getUniqueId(),
          sourceColumn: dimension.id,
          name: dimension.uiName,
          type: 'dimension',
        });
        res.schema.sourceColumns.push({
          id: dimension.id,
          name: dimension.id,
        });
      }
    });

    // compare and copy the user created columns from the current schema
    if (stream.schema && stream.schema.columns) {
      stream.schema.columns.forEach((item) => {
        if (item.transform && item.transform.name === 'const') {
          // user added const column
          res.schema.columns.push({...item});
        }
      });
    }
    return [actions.setSelectedStreamKeyVal(res)];
  });

const setAdobeDataStreamSchema = (action$) =>
  action$.ofType(actions.setAdobeStreamMetricsAndDimensions.TYPE).flatMap((action) => {
    const getEmptyArray = () => (action.payload.metrics.length || action.payload.dimensions.length ? [] : EMPTY_ARRAY);
    const res = {
      metrics: getEmptyArray(),
      dimensions: getEmptyArray(),
      schema: {
        columns: getEmptyArray(),
        sourceColumns: getEmptyArray(),
      },
    };
    action.payload.metrics.forEach((metric) => {
      res.schema.columns.push({
        id: getUniqueId(),
        sourceColumn: metric.id,
        name: metric.id,
        type: 'metric',
      });
      res.schema.sourceColumns.push({
        id: metric.id,
        name: metric.id,
      });
      res.metrics.push(metric.id);
    });

    action.payload.dimensions.forEach((dimension) => {
      res.schema.columns.push({
        id: getUniqueId(),
        sourceColumn: dimension.id,
        name: dimension.id,
        type: 'dimension',
      });
      res.schema.sourceColumns.push({
        id: dimension.id,
        name: dimension.id,
      });
      res.dimensions.push(dimension.id);
    });

    return [actions.setSelectedStreamKeyVal(res)];
  });

const setGoogleStorageStreamDiametricsAnalysisSchema = (action$, {getState}) =>
  action$.ofType(actions.setGoogleStorageStreamDiametricsAnalysisSchema.TYPE).flatMap(() => {
    const stream = selectors.getSelectedDataStream(getState());
    const getEmptyArray = () => (stream.metrics.length || stream.dimensions.length ? [] : EMPTY_ARRAY);
    const res = {
      schema: {
        columns: getEmptyArray(),
        sourceColumns: getEmptyArray(),
      },
    };
    const {fileSchema} = stream.uiState.analysisResult;
    stream.metrics.forEach((item) => {
      const metric = fileSchema.find((m) => m.index === item);
      res.schema.columns.push({
        id: getUniqueId(),
        sourceColumn: metric.index,
        name: metric.name,
        type: 'metric',
      });
      res.schema.sourceColumns.push({
        id: metric.index,
        index: metric.index,
      });
    });

    stream.dimensions.forEach((item) => {
      const dimension = fileSchema.find((m) => m.index === item);
      res.schema.columns.push({
        id: getUniqueId(),
        sourceColumn: dimension.index,
        name: dimension.name,
        type: 'dimension',
      });
      res.schema.sourceColumns.push({
        id: dimension.index,
        index: dimension.index,
      });
    });

    return [actions.setSelectedStreamKeyVal(res)];
  });

const setFileDiametricsDataStreamSchema = (action$, {getState}) =>
  action$.ofType(actions.setFileStreamDiametricsChange.TYPE, actions.removeFileStreamDiametrics.TYPE).flatMap(() => {
    const stream = selectors.getSelectedDataStream(getState());

    const {fileSchema} = stream.uiState.analysisResult;
    const schemaModifications = {
      schema: {...stream.schema},
    };

    // update existing schema items
    // eslint-disable-next-line complexity
    stream.schema.columns.forEach((item) => {
      if (
        item.sourceColumn !== undefined ||
        (item.transform &&
          item.transform.input &&
          item.transform.input.length === 1 &&
          getSourceIdsArr([], item).length === 1)
      ) {
        const sourceColumn = item.sourceColumn || getSourceIdsArr([], item)[0];
        if (stream.metrics.includes(parseInt(sourceColumn, 10))) {
          if (item.type === 'dimension') {
            // delete any copy of this item
            schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
              (a) => sourceColumn !== a.sourceColumn || item.id === a.id,
            );
          }
          if (item.type !== 'metric') {
            item.type = 'metric';
            item.sourceColumn = sourceColumn;
            if (item.transform) {
              /*  eslint no-param-reassign: "off" */
              delete item.transform;
            }
          }
        } else if (stream.dimensions.includes(parseInt(sourceColumn, 10))) {
          if (item.type !== 'dimension') {
            item.type = 'dimension';
            item.sourceColumn = sourceColumn;
            if (item.transform) {
              /*  eslint no-param-reassign: "off" */
              delete item.transform;
            }
            if (item.targetType) {
              /*  eslint no-param-reassign: "off" */
              delete item.targetType;
            }
            if (item.metricTags) {
              /*  eslint no-param-reassign: "off" */
              delete item.metricTags;
            }
          }
        } else {
          schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
            (a) => (a.sourceColumn !== undefined ? a.sourceColumn : getSourceIdsArr([], a)[0]) !== sourceColumn,
          );
          schemaModifications.schema.sourceColumns = schemaModifications.schema.sourceColumns.filter(
            (a) => parseInt(a.id, 10) !== parseInt(sourceColumn, 10),
          );
        }
      } else {
        // user created columns check if source still exist
        if (item.transform && item.transform.input && item.transform.input.length > 1 && item.type === 'dimension') {
          getSourceIdsArr([], item).forEach((id) => {
            /*  eslint no-lonely-if: "off" */
            if (!stream.dimensions.includes(parseInt(id, 10))) {
              schemaModifications.schema.columns = schemaModifications.schema.columns.filter((a) => a.id !== item.id);
            }
          });
        }
      }
    });

    // check for unAssigned items added to metrics or dimensions
    fileSchema.forEach((item) => {
      if (stream.metrics.includes(item.index) || stream.dimensions.includes(item.index)) {
        const foundItem = schemaModifications.schema.sourceColumns.find((a) => item.index === a.index);
        if (!foundItem) {
          schemaModifications.schema.columns.push({
            id: getUniqueId(),
            sourceColumn: item.index,
            name: item.name,
            type: stream.metrics.includes(item.index) ? 'metric' : 'dimension',
          });
          schemaModifications.schema.sourceColumns.push({
            id: item.index,
            index: item.index,
          });
        }
      }
    });

    return [actions.setSelectedStreamKeyVal(schemaModifications)];
  });

const setKinesisDiametricsDataStreamSchema = (action$, {getState}) =>
  action$.ofType(actions.seKinesisDiametricsChange.TYPE, actions.removeKinesisDiametrics.TYPE).flatMap(() => {
    const stream = selectors.getSelectedDataStream(getState());
    const tableMeta = stream.uiState.analysisResult.streamSchema;
    const schemaModifications = {
      schema: {...stream.schema},
    };

    // update existing schema items
    // eslint-disable-next-line complexity
    stream.schema.columns.forEach((item) => {
      if (
        item.sourceColumn !== undefined ||
        (item.transform &&
          item.transform.input &&
          item.transform.input.length === 1 &&
          getSourceIdsArr([], item).length === 1)
      ) {
        const sourceColumn = item.sourceColumn || getSourceIdsArr([], item)[0];
        if (stream.metrics.includes(sourceColumn)) {
          if (item.type === 'dimension') {
            // delete any copy of this item
            schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
              (a) => sourceColumn !== a.sourceColumn || item.name === a.name,
            );
          }
          if (item.type !== 'metric') {
            item.type = 'metric';
            item.sourceColumn = sourceColumn;
            if (item.transform) {
              /*  eslint no-param-reassign: "off" */
              delete item.transform;
            }
          }
        } else if (stream.dimensions.includes(sourceColumn)) {
          if (item.type !== 'dimension') {
            item.type = 'dimension';
            item.sourceColumn = sourceColumn;
            if (item.transform) {
              /*  eslint no-param-reassign: "off" */
              delete item.transform;
            }
            if (item.targetType) {
              /*  eslint no-param-reassign: "off" */
              delete item.targetType;
            }
            if (item.metricTags) {
              /*  eslint no-param-reassign: "off" */
              delete item.metricTags;
            }
          }
        } else {
          schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
            (a) => (a.sourceColumn !== undefined ? a.sourceColumn : getSourceIdsArr([], a)[0]) !== sourceColumn,
          );
          schemaModifications.schema.sourceColumns = schemaModifications.schema.sourceColumns.filter(
            (a) => a.path !== sourceColumn,
          );
        }
      } else {
        // user created columns check if source still exist
        if (item.transform && item.transform.input && item.transform.input.length > 1 && item.type === 'dimension') {
          getSourceIdsArr([], item).forEach((name) => {
            /*  eslint no-lonely-if: "off" */
            if (!stream.dimensions.includes(name)) {
              schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
                (a) => a.name !== item.name,
              );
            }
          });
        }
      }
    });

    // check for unAssigned items added to metrics or dimensions
    tableMeta.forEach((item) => {
      if (stream.metrics.includes(item.path) || stream.dimensions.includes(item.path)) {
        const foundItem = schemaModifications.schema.sourceColumns.find((a) => item.path === a.path);
        if (!foundItem) {
          schemaModifications.schema.columns.push({
            id: getUniqueId(),
            sourceColumn: item.path,
            name: item.path,
            type: stream.metrics.includes(item.path) ? 'metric' : 'dimension',
          });
          schemaModifications.schema.sourceColumns.push({
            id: item.path,
            path: item.path,
          });
        }
      }
    });

    return [actions.setSelectedStreamKeyVal(schemaModifications)];
  });

const setSqlDataStreamSchema = (action$, {getState}) =>
  action$.ofType(actions.setSqlStreamDiametricsChange.TYPE, actions.removeSqlStreamDiametrics.TYPE).flatMap(() => {
    const stream = selectors.getSelectedDataStream(getState());

    const tableMeta = stream.uiState.tablesViewsMetadata;
    const schemaModifications = {
      schema: {...stream.schema},
    };

    // update existing schema items
    // eslint-disable-next-line complexity
    stream.schema.columns.forEach((item) => {
      if (
        item.sourceColumn !== undefined ||
        (item.transform &&
          item.transform.input &&
          item.transform.input.length === 1 &&
          getSourceIdsArr([], item).length === 1)
      ) {
        const sourceColumn = item.sourceColumn || getSourceIdsArr([], item)[0];
        if (stream.metrics.includes(sourceColumn)) {
          if (item.type === 'dimension') {
            // delete any copy of this item
            schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
              (a) => sourceColumn !== a.sourceColumn || item.name === a.name,
            );
          }
          if (item.type !== 'metric') {
            item.type = 'metric';
            item.sourceColumn = sourceColumn;
            if (item.transform) {
              /*  eslint no-param-reassign: "off" */
              delete item.transform;
            }
          }
        } else if (stream.dimensions.includes(sourceColumn)) {
          if (item.type !== 'dimension') {
            item.type = 'dimension';
            item.sourceColumn = sourceColumn;
            if (item.transform) {
              /*  eslint no-param-reassign: "off" */
              delete item.transform;
            }
            if (item.targetType) {
              /*  eslint no-param-reassign: "off" */
              delete item.targetType;
            }
            if (item.metricTags) {
              /*  eslint no-param-reassign: "off" */
              delete item.metricTags;
            }
          }
        } else {
          schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
            (a) => (a.sourceColumn !== undefined ? a.sourceColumn : getSourceIdsArr([], a)[0]) !== sourceColumn,
          );
          schemaModifications.schema.sourceColumns = schemaModifications.schema.sourceColumns.filter(
            (a) => a.name !== sourceColumn,
          );
        }
      } else {
        // user created columns check if source still exist
        if (item.transform && item.transform.input && item.transform.input.length > 1 && item.type === 'dimension') {
          getSourceIdsArr([], item).forEach((name) => {
            /*  eslint no-lonely-if: "off" */
            if (!stream.dimensions.includes(name)) {
              schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
                (a) => a.name !== item.name,
              );
            }
          });
        }
      }
    });

    // check for unAssigned items added to metrics or dimensions
    tableMeta.forEach((item) => {
      if (stream.metrics.includes(item.name) || stream.dimensions.includes(item.name)) {
        const foundItem = schemaModifications.schema.sourceColumns.find((a) => item.name === a.name);
        if (!foundItem) {
          schemaModifications.schema.columns.push({
            id: getUniqueId(),
            sourceColumn: item.name,
            name: item.name,
            type: stream.metrics.includes(item.name) ? 'metric' : 'dimension',
          });
          schemaModifications.schema.sourceColumns.push({
            id: item.name,
            name: item.name,
          });
        }
      }
    });

    return [actions.setSelectedStreamKeyVal(schemaModifications)];
  });

const setBigQueryDataStreamSchema = (action$, {getState}) =>
  action$
    .ofType(actions.setBigQueryStreamDiametricsChange.TYPE, actions.removeBigQueryStreamDiametrics.TYPE)
    .flatMap(() => {
      const stream = selectors.getSelectedDataStream(getState());

      const tableMeta = stream.uiState.queryPreviewColumns;
      const schemaModifications = {
        schema: {...stream.schema},
      };

      // update existing schema items
      // eslint-disable-next-line complexity
      stream.schema.columns.forEach((item) => {
        if (
          item.sourceColumn !== undefined ||
          (item.transform &&
            item.transform.input &&
            item.transform.input.length === 1 &&
            getSourceIdsArr([], item).length === 1)
        ) {
          const sourceColumn = item.sourceColumn || getSourceIdsArr([], item)[0];
          if (stream.metrics.includes(sourceColumn)) {
            if (item.type === 'dimension') {
              // delete any copy of this item
              schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
                (a) => sourceColumn !== a.sourceColumn || item.name === a.name,
              );
            }
            if (item.type !== 'metric') {
              item.type = 'metric';
              item.sourceColumn = sourceColumn;
              if (item.transform) {
                /*  eslint no-param-reassign: "off" */
                delete item.transform;
              }
            }
          } else if (stream.dimensions.includes(sourceColumn)) {
            if (item.type !== 'dimension') {
              item.type = 'dimension';
              item.sourceColumn = sourceColumn;
              if (item.transform) {
                /*  eslint no-param-reassign: "off" */
                delete item.transform;
              }
              if (item.targetType) {
                /*  eslint no-param-reassign: "off" */
                delete item.targetType;
              }
              if (item.metricTags) {
                /*  eslint no-param-reassign: "off" */
                delete item.metricTags;
              }
            }
          } else {
            schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
              (a) => (a.sourceColumn !== undefined ? a.sourceColumn : getSourceIdsArr([], a)[0]) !== sourceColumn,
            );
            schemaModifications.schema.sourceColumns = schemaModifications.schema.sourceColumns.filter(
              (a) => a.name !== sourceColumn,
            );
          }
        } else {
          // user created columns check if source still exist
          if (item.transform && item.transform.input && item.transform.input.length > 1 && item.type === 'dimension') {
            getSourceIdsArr([], item).forEach((name) => {
              /*  eslint no-lonely-if: "off" */
              if (!stream.dimensions.includes(name)) {
                schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
                  (a) => a.name !== item.name,
                );
              }
            });
          }
        }
      });

      // check for unAssigned items added to metrics or dimensions
      tableMeta.forEach((item) => {
        if (stream.metrics.includes(item.name) || stream.dimensions.includes(item.name)) {
          const foundItem = schemaModifications.schema.sourceColumns.find((a) => item.name === a.name);
          if (!foundItem) {
            schemaModifications.schema.columns.push({
              id: getUniqueId(),
              sourceColumn: item.name,
              name: item.name,
              type: stream.metrics.includes(item.name) ? 'metric' : 'dimension',
            });
            schemaModifications.schema.sourceColumns.push({
              id: item.name,
              name: item.name,
            });
          }
        }
      });

      return [actions.setSelectedStreamKeyVal(schemaModifications)];
    });

const setMParticleDataStreamSchema = (action$, {getState}) =>
  action$
    .ofType(actions.setMParticleStreamDiametricsChange.TYPE, actions.removeMParticleStreamDiametrics.TYPE)
    .flatMap(() => {
      const stream = selectors.getSelectedDataStream(getState());
      const getEmptyArray = () => (stream.metrics.length || stream.dimensions.length ? [] : EMPTY_ARRAY);
      const res = {
        schema: {
          columns: getEmptyArray(),
          sourceColumns: getEmptyArray(),
        },
      };
      const {eventMetadata} = stream.uiState;
      const items = eventMetadata.metrics.concat(eventMetadata.dimensions);
      stream.metrics.forEach((item) => {
        const metric = items.find((m) => m.path === item);
        res.schema.columns.push({
          id: getUniqueId(),
          sourceColumn: metric.path,
          name: metric.name,
          type: 'metric',
        });
        res.schema.sourceColumns.push({
          id: metric.path,
          path: metric.path,
        });
      });

      stream.dimensions.forEach((item) => {
        const dimension = items.find((m) => m.path === item);
        res.schema.columns.push({
          id: getUniqueId(),
          sourceColumn: dimension.path,
          name: dimension.name,
          type: 'dimension',
        });
        res.schema.sourceColumns.push({
          id: dimension.path,
          path: dimension.path,
        });
      });

      return [actions.setSelectedStreamKeyVal(res)];
    });

const setMixpanelDataStreamSchema = (action$, {getState}) =>
  action$
    .ofType(
      actions.setMixpanelStreamDiametricsChange.TYPE,
      actions.removeMixpanelStreamDiametrics.TYPE,
      actions.setMixpanelStreamClearAllDiametrics.TYPE,
    )
    .flatMap(() => {
      const stream = selectors.getSelectedDataStream(getState());
      const getEmptyArray = () => (stream.metrics.length || stream.dimensions.length ? [] : EMPTY_ARRAY);
      const res = {
        schema: {
          columns: getEmptyArray(),
          sourceColumns: getEmptyArray(),
        },
      };
      const {eventMetadata} = stream.uiState;

      stream.metrics.forEach((item) => {
        const metric = eventMetadata.find((m) => m.name === item);
        res.schema.columns.push({
          id: getUniqueId(),
          sourceColumn: metric.name,
          name: metric.name,
          type: 'metric',
        });
        res.schema.sourceColumns.push({
          id: metric.name,
          name: metric.name,
          path: metric.path,
        });
      });

      stream.dimensions.forEach((item) => {
        const dimension = eventMetadata.find((m) => m.name === item);
        res.schema.columns.push({
          id: getUniqueId(),
          sourceColumn: dimension.name,
          name: dimension.name,
          type: 'dimension',
        });
        res.schema.sourceColumns.push({
          id: dimension.name,
          name: dimension.name,
          path: dimension.path,
        });
      });

      return [actions.setSelectedStreamKeyVal(res)];
    });

const setSegmentDataStreamSchema = (action$, {getState}) =>
  action$
    .ofType(actions.setSegmentStreamDiametricsChange.TYPE, actions.removeSegmentStreamDiametrics.TYPE)
    .flatMap(() => {
      const stream = selectors.getSelectedDataStream(getState());
      const getEmptyArray = () => (stream.metrics.length || stream.dimensions.length ? [] : EMPTY_ARRAY);
      const res = {
        schema: {
          columns: getEmptyArray(),
          sourceColumns: getEmptyArray(),
        },
      };
      const {messageMetadata} = stream.uiState;
      const metrics = messageMetadata.metrics || EMPTY_ARRAY;
      const dimensions = messageMetadata.dimensions || EMPTY_ARRAY;
      const items = metrics.concat(dimensions);
      stream.metrics.forEach((item) => {
        const metric = items.find((m) => m.path === item);
        res.schema.columns.push({
          id: getUniqueId(),
          sourceColumn: metric.path,
          name: metric.name,
          type: 'metric',
        });
        res.schema.sourceColumns.push({
          id: metric.path,
          path: metric.path,
        });
      });

      stream.dimensions.forEach((item) => {
        const dimension = items.find((m) => m.path === item);
        res.schema.columns.push({
          id: getUniqueId(),
          sourceColumn: dimension.path,
          name: dimension.name,
          type: 'dimension',
        });
        res.schema.sourceColumns.push({
          id: dimension.path,
          path: dimension.path,
        });
      });

      return [actions.setSelectedStreamKeyVal(res)];
    });

const setAuctionsDataStreamSchema = (action$, {getState}) =>
  action$
    .ofType(actions.setAuctionStreamDiametricsChange.TYPE, actions.removeAuctionStreamDiametrics.TYPE)
    .flatMap(() => {
      const stream = selectors.getSelectedDataStream(getState());
      const tableMeta = stream.uiState.analysisResult.fileSchema;
      const schemaModifications = {
        schema: {...stream.schema},
      };

      // update existing schema items
      // eslint-disable-next-line complexity
      stream.schema.columns.forEach((item) => {
        if (
          item.sourceColumn !== undefined ||
          (item.transform &&
            item.transform.input &&
            item.transform.input.length === 1 &&
            getSourceIdsArr([], item).length === 1)
        ) {
          const sourceColumn = item.sourceColumn || getSourceIdsArr([], item)[0];
          if (stream.metrics.includes(sourceColumn)) {
            if (item.type === 'dimension') {
              // delete any copy of this item
              schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
                (a) => sourceColumn !== a.sourceColumn || item.name === a.name,
              );
            }
            if (item.type !== 'metric') {
              item.type = 'metric';
              item.sourceColumn = sourceColumn;
              if (item.transform) {
                /*  eslint no-param-reassign: "off" */
                delete item.transform;
              }
            }
          } else if (stream.dimensions.includes(sourceColumn)) {
            if (item.type !== 'dimension') {
              item.type = 'dimension';
              item.sourceColumn = sourceColumn;
              if (item.transform) {
                /*  eslint no-param-reassign: "off" */
                delete item.transform;
              }
              if (item.targetType) {
                /*  eslint no-param-reassign: "off" */
                delete item.targetType;
              }
              if (item.metricTags) {
                /*  eslint no-param-reassign: "off" */
                delete item.metricTags;
              }
            }
          } else {
            schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
              (a) => (a.sourceColumn !== undefined ? a.sourceColumn : getSourceIdsArr([], a)[0]) !== sourceColumn,
            );
            schemaModifications.schema.sourceColumns = schemaModifications.schema.sourceColumns.filter(
              (a) => a.name !== sourceColumn,
            );
          }
        } else {
          // user created columns check if source still exist
          if (item.transform && item.transform.input && item.transform.input.length > 1 && item.type === 'dimension') {
            getSourceIdsArr([], item).forEach((name) => {
              /*  eslint no-lonely-if: "off" */
              if (!stream.dimensions.includes(name)) {
                schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
                  (a) => a.name !== item.name,
                );
              }
            });
          }
        }
      });

      // check for unAssigned items added to metrics or dimensions
      tableMeta.forEach((item) => {
        if (stream.metrics.includes(item.name) || stream.dimensions.includes(item.name)) {
          const foundItem = schemaModifications.schema.sourceColumns.find((a) => item.name === a.name);
          if (!foundItem) {
            schemaModifications.schema.columns.push({
              id: getUniqueId(),
              sourceColumn: item.name,
              name: item.name,
              type: stream.metrics.includes(item.name) ? 'metric' : 'dimension',
            });
            schemaModifications.schema.sourceColumns.push({
              id: item.name,
              name: item.name,
            });
          }
        }
      });

      return [actions.setSelectedStreamKeyVal(schemaModifications)];
    });

const setSalesforceDataStreamSchema = (action$, {getState}) =>
  action$
    .ofType(actions.setSalesforceStreamDiametricsChange.TYPE, actions.removeSalesforceStreamDiametrics.TYPE)
    .flatMap(() => {
      const stream = selectors.getSelectedDataStream(getState());
      const tableMeta = stream.uiState.eventMetadata.fields;
      const schemaModifications = {
        schema: {...stream.schema},
      };

      // update existing schema items
      // eslint-disable-next-line complexity
      stream.schema.columns.forEach((item) => {
        if (
          item.sourceColumn !== undefined ||
          (item.transform &&
            item.transform.input &&
            item.transform.input.length === 1 &&
            getSourceIdsArr([], item).length === 1)
        ) {
          const sourceColumn = item.sourceColumn || getSourceIdsArr([], item)[0];
          if (stream.metrics.includes(sourceColumn)) {
            if (item.type === 'dimension') {
              // delete any copy of this item
              schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
                (a) => sourceColumn !== a.sourceColumn || item.name === a.name,
              );
            }
            if (item.type !== 'metric') {
              item.type = 'metric';
              item.sourceColumn = sourceColumn;
              if (item.transform) {
                /*  eslint no-param-reassign: "off" */
                delete item.transform;
              }
            }
          } else if (stream.dimensions.includes(sourceColumn)) {
            if (item.type !== 'dimension') {
              item.type = 'dimension';
              item.sourceColumn = sourceColumn;
              if (item.transform) {
                /*  eslint no-param-reassign: "off" */
                delete item.transform;
              }
              if (item.targetType) {
                /*  eslint no-param-reassign: "off" */
                delete item.targetType;
              }
              if (item.metricTags) {
                /*  eslint no-param-reassign: "off" */
                delete item.metricTags;
              }
            }
          } else {
            schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
              (a) => (a.sourceColumn !== undefined ? a.sourceColumn : getSourceIdsArr([], a)[0]) !== sourceColumn,
            );
            schemaModifications.schema.sourceColumns = schemaModifications.schema.sourceColumns.filter(
              (a) => a.id !== sourceColumn,
            );
          }
        } else {
          // user created columns check if source still exist
          if (item.transform && item.transform.input && item.transform.input.length > 1 && item.type === 'dimension') {
            getSourceIdsArr([], item).forEach((name) => {
              /*  eslint no-lonely-if: "off" */
              if (!stream.dimensions.includes(name)) {
                schemaModifications.schema.columns = schemaModifications.schema.columns.filter(
                  (a) => a.name !== item.name,
                );
              }
            });
          }
        }
      });

      // check for unAssigned items added to metrics or dimensions
      tableMeta.forEach((item) => {
        const itemName = `${stream.objects[0]}.${item.name}`;
        if (stream.metrics.includes(itemName) || stream.dimensions.includes(itemName)) {
          const foundItem = schemaModifications.schema.sourceColumns.find((a) => itemName === a.name);
          if (!foundItem) {
            schemaModifications.schema.columns.push({
              id: getUniqueId(),
              sourceColumn: itemName,
              name: itemName,
              type: stream.metrics.includes(itemName) ? 'metric' : 'dimension',
            });
            schemaModifications.schema.sourceColumns.push({
              id: itemName,
              name: itemName,
            });
          }
        }
      });

      return [actions.setSelectedStreamKeyVal(schemaModifications)];
    });

const setNewrelicDataStreamSchema = (action$, {getState}) =>
  action$
    .ofType(actions.setNewrelicStreamDiametricsChange.TYPE, actions.removeNewrelicStreamDiametrics.TYPE)
    .flatMap(() => {
      const stream = selectors.getSelectedDataStream(getState());
      const getEmptyArray = () => (stream.metrics.length || stream.dimensions.length ? [] : EMPTY_ARRAY);
      const res = {
        schema: {
          columns: getEmptyArray(),
          sourceColumns: getEmptyArray(),
        },
      };
      const {keyset} = stream.uiState;
      const metrics = keyset.metrics || EMPTY_ARRAY;
      const dimensions = keyset.dimensions || EMPTY_ARRAY;
      const items = metrics.concat(dimensions);
      stream.metrics.forEach((item) => {
        const metric = items.find((m) => m === item);
        res.schema.columns.push({
          id: getUniqueId(),
          sourceColumn: metric,
          name: metric,
          type: 'metric',
        });
        res.schema.sourceColumns.push({
          id: metric,
          name: metric,
        });
      });

      stream.dimensions.forEach((item) => {
        const dimension = items.find((m) => m === item);
        res.schema.columns.push({
          id: getUniqueId(),
          sourceColumn: dimension,
          name: dimension,
          type: 'dimension',
        });
        res.schema.sourceColumns.push({
          id: dimension,
          name: dimension,
        });
      });

      return [actions.setSelectedStreamKeyVal(res)];
    });

const setStreamSchemaKeyValWithUpdateDataStream = (action$, {getState}) =>
  action$.ofType(actions.setStreamSchemaColumnKeyVal.TYPE).flatMap((action) => {
    if (get(action.meta, 'runStreamPreview', false)) {
      return [actions.updateDataStream(selectors.getSelectedDataStream(getState()), {isPreviewMode: true})];
    }

    return [];
  });
const setStreamSchemaTransform = (action$, {getState}) =>
  action$
    .ofType(actions.setStreamSchemaTransform.TYPE, actions.deleteStreamSchemaColumn.TYPE)
    .flatMap(() => [actions.updateDataStream(selectors.getSelectedDataStream(getState()), {isPreviewMode: true})]);

const createStreamSchemaNewColumn = (action$, {getState}) =>
  action$.ofType(actions.createStreamSchemaNewColumn.TYPE).flatMap((action) => {
    if (get(action.meta, 'runStreamPreview', true)) {
      return [actions.updateDataStream(selectors.getSelectedDataStream(getState()), {isPreviewMode: true})];
    }

    return [
      actions.updateDataStream(selectors.getSelectedDataStream(getState()), {
        isPreviewMode: false,
        goToTablePreviewRoute: get(action.meta, 'goToTablePreviewRoute', false),
        isAsyncQuery: get(action.meta, 'isAsyncQuery', false),
      }),
    ];
  });

const setStreamSchemaTransformComplete = (action$, {getState}) =>
  action$.ofType(actions.updateDataStream.success.TYPE).flatMap(({meta}) => {
    if (get(meta, 'isPreviewMode', false)) {
      return [actions.getStreamPreviewData()];
    }
    if (get(meta, 'goToTablePreviewRoute', false)) {
      const stream = selectors.getSelectedDataStream(getState());
      return [setRoute(`/bc/data-streams/${stream.id}/preview`)];
    }
    if (get(meta, 'isAsyncQuery', false)) {
      const stream = selectors.getSelectedDataStream(getState());
      return [actions.runAsyncQueryDataStream(stream)];
    }
    return [];
  });

const resetSelectedDataStream = (action$) =>
  action$.ofType(routeChangedAction.TYPE).flatMap(({payload: {newURL}}) => {
    const isDataManager = (url) => url.indexOf('data-manager') > 0;
    const outOfBc = (url) => url.indexOf('/bc/') < 0;

    if (isDataManager(newURL) || outOfBc(newURL)) {
      return [actions.setSelectedDataStream()];
    }
    return [];
  });

const asyncQueryDataStream = makeAsyncEpic(actions.runAsyncQueryDataStream, api.runAsyncDataStreamQuery);

const setStreamUniqeId = (action$) =>
  action$.ofType(actions.runAsyncQueryDataStream.success.TYPE).flatMap(({payload}) => {
    const stream = {
      ...payload,
      uiState: {
        ...payload.uiState,
        uniqeQueryId: getStreamQueryId(payload),
      },
    };
    return [actions.updateDataStream(stream)];
  });

const fetchStreamsStats = makeAsyncEpic(actions.fetchStreamsStats, api.fetchStreamsStats);

const getStreamAssets = (action$) =>
  action$.ofType(actions.getStreamAssets.TYPE).flatMap(({payload}) => {
    if (payload) {
      return [
        actions.fetchStreamDashboards({schemaType: 'dashboards', streamId: payload}),
        actions.fetchStreamDashboardsV2({schemaType: 'dashboardsV2', streamId: payload}),
        actions.fetchStreamAlerts(payload),
        actions.fetchStreamComposites(payload),
        actions.setViewStreamSummaryAssets({
          assets: {
            streamId: payload,
            alerts: {
              isLoading: true,
              items: [],
              total: 0,
            },
            composites: {
              isLoading: true,
              items: [],
              total: 0,
            },
            dashboards: {
              isLoading: true,
              items: [],
              total: 0,
            },
            dashboardsV2: {
              isLoading: true,
              items: [],
              total: 0,
            },
          },
        }),
      ];
    }
    return [];
  });

const fetchStreamDashboardsSuccess = (action$) =>
  action$.ofType(actions.fetchStreamDashboards.success.TYPE).flatMap(({payload}) => {
    if (payload) {
      return [
        actions.setViewStreamSummaryAssets({
          assetType: 'dashboards',
          asset: {
            items: payload.dashboards.map((item) => ({
              id: item._id,
              title: item.name,
              ownerId: item.ownerUser,
            })),
            total: payload.total,
          },
        }),
      ];
    }
    return [
      actions.setViewStreamSummaryAssets({
        assetType: 'dashboards',
        asset: {
          items: [],
          total: 0,
        },
      }),
    ];
  });

const fetchStreamDashboardsV2Success = (action$) =>
  action$.ofType(actions.fetchStreamDashboardsV2.success.TYPE).flatMap(({payload}) => {
    if (payload) {
      return [
        actions.setViewStreamSummaryAssets({
          assetType: 'dashboardsV2',
          asset: {
            items: payload.dashboards.map((item) => ({
              id: item._id,
              title: item.name,
              ownerId: item.ownerUser,
            })),
            total: payload.total,
          },
        }),
      ];
    }
    return [
      actions.setViewStreamSummaryAssets({
        assetType: 'dashboardsV2',
        asset: {
          items: [],
          total: 0,
        },
      }),
    ];
  });

const fetchStreamAlertsSuccess = (action$) =>
  action$.ofType(actions.fetchStreamAlerts.success.TYPE).flatMap(({payload}) => {
    if (payload) {
      return [
        actions.setViewStreamSummaryAssets({
          assetType: 'alerts',
          asset: {
            items: payload.alerts.map((item) => ({
              id: item.data.id,
              ownerId: item.data.ownerId,
              severity: item.data.severity,
              title: item.data.title,
              validation: item.data.validation,
            })),
            total: payload.total,
          },
        }),
      ];
    }
    return [
      actions.setViewStreamSummaryAssets({
        assetType: 'alerts',
        asset: {
          items: [],
          total: 0,
        },
      }),
    ];
  });

const fetchStreamCompositesSuccess = (action$) =>
  action$.ofType(actions.fetchStreamComposites.success.TYPE).flatMap(({payload}) => {
    if (payload) {
      return [
        actions.setViewStreamSummaryAssets({
          assetType: 'composites',
          asset: {
            items: payload.composites.map((item) => ({
              id: item.id,
              title: item.title,
              owner: item.owner,
            })),
            total: payload.total,
          },
        }),
      ];
    }
    return [
      actions.setViewStreamSummaryAssets({
        assetType: 'composites',
        asset: {
          items: [],
          total: 0,
        },
      }),
    ];
  });

const dataStreamsEpic = combineEpics(
  setSelectedEditor,
  openSchedulerEditor,
  fetchDataStreams,
  refetchDataStreamsIfDuringFetchStreamsArrayWasChanged,
  createDataStream,
  duplicateStream,
  deletePipelineStream,
  refrashPipeline,
  saveDataStreamSuccess,
  autoSaveDataStream1,
  autoSaveDataStream2,
  autoSaveDataStream3,
  autoSaveDataStream4,
  autoSaveDataStream5,
  autoSaveDataStream6,
  autoSaveDataStream7,
  autoSaveDataStream8,
  autoSaveDataStream9,
  autoSaveDataStream10,
  autoSaveDataStream11,
  autoSaveDataStreamDebounced,
  updateDataStream,
  updateDataStreamScheduling,
  startDataStream,
  pauseResumeDataStream,
  dataStreamStarted,
  deleteDataStream,
  editDataStream,
  editLiveStream,
  previewDataStream,
  getStreamPreviewData,
  fetchDataStreamPreview,
  fetchLiveDataStreamPreview,
  fetchLiveDataStreamStats,
  fetchDataStreamStats,
  fetchDataStreamCardinality,
  fetchStreamLastRun,
  fetchStreamHistoryLogs,
  fetchTransformFunctions,
  setGaDataStreamSchema,
  setAdobeDataStreamSchema,
  setFileDiametricsDataStreamSchema,
  setSqlDataStreamSchema,
  setKinesisDiametricsDataStreamSchema,
  setBigQueryDataStreamSchema,
  setMParticleDataStreamSchema,
  setSegmentDataStreamSchema,
  setAuctionsDataStreamSchema,
  setSalesforceDataStreamSchema,
  setStreamSchemaKeyValWithUpdateDataStream,
  setNewrelicDataStreamSchema,
  createStreamSchemaNewColumn,
  setStreamSchemaTransform,
  setStreamSchemaTransformComplete,
  resetSelectedDataStream,
  asyncQueryDataStream,
  setStreamUniqeId,
  updateStreamName,
  setMixpanelDataStreamSchema,
  updateAgentOwner,
  fetchDashAndAlertTitles,
  fetchStreamsStats,
  fetchStreamDashboards,
  fetchStreamDashboardsV2,
  fetchStreamAlerts,
  fetchStreamComposites,
  getStreamAssets,
  fetchStreamDashboardsSuccess,
  fetchStreamDashboardsV2Success,
  fetchStreamAlertsSuccess,
  fetchStreamCompositesSuccess,
  setGoogleStorageStreamDiametricsAnalysisSchema,
  deleteImpacts,
  editEventStream,
  createEventStream,
  previewEventStream,
);
export default dataStreamsEpic;
