import { Injectable } from '@angular/core';
import { Observable, of, ConnectableObservable } from 'rxjs';
import { last, tap, catchError, mergeMap, finalize, publishLast } from 'rxjs/operators';
import { LoggerService, ILogger } from './logger.service';
import { QueueTopic } from '../model/queue-topic';

@Injectable()
export class QueueService {

  private logger: ILogger;
  private topics: { [key: string]: Observable<any> } = {};

  constructor(
    logger: LoggerService
  ) { 
    this.logger = logger.getLogger('QueueService');
  }

  public queue<T>(request: Observable<T>, queueTopic: QueueTopic): Observable<T> {
    const topic = queueTopic.toString();
    
    this.logger.debug('Queuing observable in topic', topic);

    const queued = (this.topics[topic] || of(null)).pipe(
      catchError(_ => of(null)),
      mergeMap(_ => <Observable<T>>request),
      finalize(() => delete this.topics[topic]),
      publishLast()
    );

    this.topics[topic] = queued;

    (<ConnectableObservable<any>>queued).connect();

    return queued;
  }
}
