#include <stdio.h>
#include <unistd.h>
#include <malloc.h>
#include <pthread.h>
#include <semaphore.h>
struct queue {
    void *data;
    size_t size;
    struct queue *next;
};
//Очередь
struct queue *buff;

//Мьютекс для защиты очереди
pthread_mutex_t buff_mutex=PTHREAD_MUTEX_INITIALIZER;
//семафор для очереди
sem_t buff_count;
//condition variable для того чтобы ридер знал когда врайтер что-то записал
pthread_cond_t reader_cv=PTHREAD_COND_INITIALIZER;
//мьютекс для condition variable
pthread_mutex_t reader_mutex=PTHREAD_MUTEX_INITIALIZER;

//Фунция записи из буфера в стдаут
//работает отдельным витком
void* writer(void *arg){
    struct queue *data,*prev;
    while(1){
        sem_wait(&buff_count);//что-то в очереди есть теперь
	pthread_mutex_lock(&buff_mutex);//блокируем доступ к очереди
	if(!buff){
	    pthread_mutex_unlock(&buff_mutex);//разблокируем доступ к очереди
	    break;//выходим
	}
        data=buff;//сохраняем голову
	prev=buff;
        while(data->next){//ищем последний элемент очереди
    	    prev=data;
	    data=data->next;
        }
	if(prev==data){//если в очереди 1 элемент
	    buff=NULL;//обнуляем голову
        }else{
    	    prev->next=NULL;//обнуляем ссылку на последний элемент очереди
	}
	/*в data содержится ссылка на данные и очередь в порядке*/
	pthread_mutex_unlock(&buff_mutex);//разблокируем доступ к очереди
	/*Выводим данные и освобождаем память*/
	write(1,data->data,data->size);
	//fprintf(stderr,"%d bytes writed\n",data->size);
	free(data->data);
	free(data);
	pthread_cond_signal(&reader_cv);
    }
}

#define BUFFSIZE 32	//метров
#define CLUSTER 1048576 //1МБ

int main(int argc, char *argv[]){
    size_t readed=0;
    struct queue *data;
    pthread_t tid;
    int smv=0,s=0;
    
    
    if(argc<2){
	fprintf(stderr,"Usage : %s <buffer_size> in MB\nLike %s 128\n",argv[0],argv[0]);
	return -1;
    }
    s=atoi(argv[1]);
    if((s<2)||(s>4096)){
	s=BUFFSIZE;
	fprintf(stderr,"Buffer size incorect, using default %dMB\n",BUFFSIZE);
	
    }
    
    buff=NULL;
    sem_init(&buff_count,0,0);

    pthread_create(&tid,NULL,&writer,NULL);
    pthread_mutex_lock(&reader_mutex);
    while(1){
	sem_getvalue(&buff_count,&smv);
	if(smv>=s){//если у нас переполнена очередь - ждём сигнала от врайтера, что он записал что-то
		
	    /*внутри pthread_cond_wait мьютекс разблокируется, ждётся сигнал и мьютекс блокируется
		*/
		pthread_cond_wait(&reader_cv,&reader_mutex);
	};
	    
	data=malloc(sizeof(struct queue));
	if(!data)break;
        data->data=malloc(CLUSTER);
	if(!data->data)break;
	
    
	//readed=read(0,data->data,CLUSTER);//прочитает сколько отдаст pipe - возможно нерациональное использование памяти
	readed=fread(data->data,1,CLUSTER,stdin);//прочитает чётко кластер - всё равно транслируется в 1 или несколько read
        if(!readed)break;//закончился файл
	data->size=readed;
	
	pthread_mutex_lock(&buff_mutex);
        data->next=buff;
	buff=data;
        pthread_mutex_unlock(&buff_mutex);
        /*данные готовы - можно семафорить*/
	sem_post(&buff_count);
        //fprintf(stderr,"queue size=%d, %d bytes readed\n",smv,readed);
	fprintf(stderr,"queue size=%d\t\r",smv);
    }
    fprintf(stderr,"waiting for thread\r");
    /*говорим что мы что-то добавили, но на самом деле не добавили (добавили конец файла :)
    таким образом врайтер получит семафор и выйдет по пустому буферу
    */
    sem_post(&buff_count);
    pthread_join(tid,NULL);//ждём окончания витка
    //Чистимся
    pthread_mutex_destroy(&buff_mutex);
    sem_destroy(&buff_count);
    pthread_mutex_destroy(&reader_mutex);
    pthread_cond_destroy(&reader_cv);
    return 0;
}
