这是一个(非常)古老的帖子,但最近我遇到了类似的问题。实际上,我需要将stdin克隆到stdout,并将其复制到一个非阻塞的管道中。第一个答案中提议的ftee确实有所帮助,但对我的用例来说太不稳定了。这意味着如果我没有及时处理数据,我就会丢失可以处理的数据。
我面临的情况是我有一个进程(some_process),它聚合一些数据并每三秒钟将其结果写入stdout。简化后的设置如下(在实际设置中,我使用的是命名管道):
some_process | ftee >(onlineAnalysis.pl > results) | gzip > raw_data.gz
现在,raw_data.gz必须被压缩并且必须完整。ftee很好地完成了这项工作。但是我中间使用的管道太慢了,无法抓取刷新出来的数据,但如果它可以获得数据,它足够快地处理所有内容,这已经通过常规的tee进行了测试。然而,如果未命名管道发生任何事情,常规tee会阻塞,而我想随时接入,因此tee不是一个选择。回到主题:当我在中间加入一个缓冲区时,情况就变得更好了,结果为:
some_process | ftee >(mbuffer -m 32M| onlineAnalysis.pl > results) | gzip > raw_data.gz
但是这仍然会丢失我可以处理的数据。因此,我前进并将之前提出的免费版扩展为带缓冲的版本(bftee)。它仍具有所有相同的特性,但在写入失败时使用一个(低效?)内部缓冲区。如果缓冲区已满,则仍会丢失数据,但对于我的情况而言效果很好。像往常一样,还有很大的改进空间,但由于我从这里复制了代码,所以我想将其分享给可能需要它的人。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>
#define BUFFER_SIZE 4096
#define BLOCK_SIZE 2048
typedef struct {
char data[BLOCK_SIZE];
int bytes;
} sBuffer;
typedef struct {
sBuffer *data;
int bufferSize;
int start;
int end;
int active;
int maxUse;
int drops;
int sWrites;
int pWrites;
} sQueue;
void InitQueue(sQueue*, int);
void PushToQueue(sQueue*, sBuffer*, int);
sBuffer *RetrieveFromQueue(sQueue*);
sBuffer *PeakAtQueue(sQueue*);
void ShrinkInQueue(sQueue *queue, int);
void DelFromQueue(sQueue *queue);
static void sigUSR1(int);
static void sigINT(int);
sQueue queue;
volatile int quit;
int main(int argc, char *argv[])
{
int readfd, writefd;
struct stat status;
char *fifonam;
sBuffer buffer;
ssize_t bytes;
int bufferSize = BUFFER_SIZE;
signal(SIGPIPE, SIG_IGN);
signal(SIGUSR1, sigUSR1);
signal(SIGTERM, sigINT);
signal(SIGINT, sigINT);
if(argc < 2 || argc > 3)
{
printf("Usage:\n someprog 2>&1 | %s FIFO [BufferSize]\n"
"FIFO - path to a named pipe, required argument\n"
"BufferSize - temporary Internal buffer size in case write to FIFO fails\n", argv[0]);
exit(EXIT_FAILURE);
}
fifonam = argv[1];
if (argc == 3) {
bufferSize = atoi(argv[2]);
if (bufferSize == 0) bufferSize = BUFFER_SIZE;
}
readfd = open(fifonam, O_RDONLY | O_NONBLOCK);
if(-1==readfd)
{
perror("bftee: readfd: open()");
exit(EXIT_FAILURE);
}
if(-1==fstat(readfd, &status))
{
perror("bftee: fstat");
close(readfd);
exit(EXIT_FAILURE);
}
if(!S_ISFIFO(status.st_mode))
{
printf("bftee: %s in not a fifo!\n", fifonam);
close(readfd);
exit(EXIT_FAILURE);
}
writefd = open(fifonam, O_WRONLY | O_NONBLOCK);
if(-1==writefd)
{
perror("bftee: writefd: open()");
close(readfd);
exit(EXIT_FAILURE);
}
close(readfd);
InitQueue(&queue, bufferSize);
quit = 0;
while(!quit)
{
bytes = read(STDIN_FILENO, buffer.data, sizeof(buffer.data));
if (bytes < 0 && errno == EINTR) continue;
if (bytes <= 0) break;
buffer.bytes = bytes;
bytes = write(STDOUT_FILENO, buffer.data, buffer.bytes);
queue.sWrites++;
if(-1==bytes) {
perror("ftee: writing to stdout");
break;
}
sBuffer *tmpBuffer = NULL;
while ((bytes != -1) && (tmpBuffer = PeakAtQueue(&queue)) != NULL) {
bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes);
if (bytes == tmpBuffer->bytes) {
DelFromQueue(&queue);
queue.pWrites++;
} else if (bytes > 0) {
ShrinkInQueue(&queue, bytes);
bytes = -1;
}
}
if (bytes != -1) bytes = write(writefd, buffer.data, buffer.bytes);
if (bytes != buffer.bytes)
PushToQueue(&queue, &buffer, bytes);
else
queue.pWrites++;
}
if (queue.active > 0) {
int saved_flags = fcntl(writefd, F_GETFL);
int new_flags = saved_flags & ~O_NONBLOCK;
int res = fcntl(writefd, F_SETFL, new_flags);
sBuffer *tmpBuffer = NULL;
while ((tmpBuffer = PeakAtQueue(&queue)) != NULL) {
int bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes);
if (bytes != -1) DelFromQueue(&queue);
}
}
close(writefd);
}
void InitQueue (sQueue *queue, int bufferSize) {
queue->data = calloc(bufferSize, sizeof(sBuffer));
queue->bufferSize = bufferSize;
queue->start = 0;
queue->end = 0;
queue->active = 0;
queue->maxUse = 0;
queue->drops = 0;
queue->sWrites = 0;
queue->pWrites = 0;
}
void PushToQueue(sQueue *queue, sBuffer *p, int offset)
{
if (offset < 0) offset = 0;
if (offset == p->bytes) return;
if (offset > p->bytes) {perror("got more bytes to buffer than we read\n"); exit(EXIT_FAILURE);}
memcpy(queue->data[queue->end].data, p->data + offset , p->bytes-offset);
queue->data[queue->end].bytes = p->bytes - offset;
queue->end = (queue->end + 1) % queue->bufferSize;
if (queue->active < queue->bufferSize)
{
queue->active++;
if (queue->active > queue->maxUse) queue->maxUse = queue->active;
} else {
queue->start = (queue->start + 1) % queue->bufferSize;
queue->drops++;
}
}
sBuffer *RetrieveFromQueue(sQueue *queue)
{
if (!queue->active) { return NULL; }
queue->start = (queue->start + 1) % queue->bufferSize;
queue->active--;
return &(queue->data[queue->start]);
}
sBuffer *PeakAtQueue(sQueue *queue)
{
if (!queue->active) { return NULL; }
return &(queue->data[queue->start]);
}
void ShrinkInQueue(sQueue *queue, int bytes) {
if (bytes <= 0) return;
if (queue->data[queue->start].bytes == bytes) {
DelFromQueue(queue);
return;
};
if (queue->data[queue->start].bytes > bytes) {
memmove(queue->data[queue->start].data, queue->data[queue->start].data + bytes, queue->data[queue->start].bytes - bytes);
queue->data[queue->start].bytes = queue->data[queue->start].bytes - bytes;
return;
}
if (queue->data[queue->start].bytes < bytes) {
perror("we wrote more than we had - this should never happen\n");
exit(EXIT_FAILURE);
return;
}
}
void DelFromQueue(sQueue *queue)
{
if (queue->active > 0) {
queue->start = (queue->start + 1) % queue->bufferSize;
queue->active--;
}
}
static void sigUSR1(int signo) {
fprintf(stderr, "Buffer use: %i (%i/%i), STDOUT: %i PIPE: %i:%i\n", queue.active, queue.maxUse, queue.bufferSize, queue.sWrites, queue.pWrites, queue.drops);
}
static void sigINT(int signo) {
quit++;
if (quit > 1) exit(EXIT_FAILURE);
}
这个版本增加了一个可选参数,用于指定要缓冲的管道块的数量。我的示例调用现在看起来像这样:
some_process | bftee >(onlineAnalysis.pl > results) 16384 | gzip > raw_data.gz
导致在丢弃发生之前需要缓冲16384个块。这会使用大约32 Mbyte的额外内存,但...谁在意呢?
当然,在实际环境中,我使用命名管道,以便根据需要进行连接和断开连接。它看起来像这样:
mkfifo named_pipe
some_process | bftee named_pipe 16384 | gzip > raw_data.gz &
cat named_pipe | onlineAnalysis.pl > results
此外,该进程对信号的反应如下:
SIGUSR1 -> 将计数器打印到 STDERR
SIGTERM,SIGINT -> 首先退出主循环并将缓冲区刷新到管道,第二个立即终止程序。
也许这会在将来帮助某些人...
祝好