高エネルギー物理学実験におけるデータ収集

機能概要

データ収集プログラムを開発する上で、簡単なデータ生成プログラムが欲しくなる。以下のプログラムは、TCP/IP で SERVICE_PORT に接続すると、データを文字列で垂れ流すプログラムである。データは値が 0 - MAX_DATVAL (0 と MAX_DATVAL を含む)の一様乱数で、<CR><LF> によって区切られている。(従って、telnet で動作確認できる)。

利用法

このデータ一つ一つを一つの測定器の一つのチャネルデータだとしてもよいし、いくつかまとめて異なる分布のデータを生成するのに使ってもよいだろう。とにかく接続すればデータが垂れ流されてくるので、使い道は色々考えられる。複数接続も可能なので、複数入力のテストに使うことも考えられる。

改良をした方がよい点

簡単のため、SERVICE_PORT と MAX_DATVAL を固定してあるが、これらを起動パラメータとして与えられるようにすれば、より使いやすくなるだろう。また、binary データの送信も行えるようにすると利用範囲が広がるだろう。

プログラム技法

プログラムは接続を受け入れたら、pthread_create() を使って thread を生成している。この時、detached mode で起動するように指定してある。従って各 thread は終了時点で資源は自動的に解放される(自動消滅する)はずである。

Thread は data が送れなくなるまで続く。すなわち、client 側が接続を切るまで続く。

一様乱数生成部分は thread 間で共有するようにしているので、thread 間で競合しないよう mutex で排他制御している(rand() が thread safe かどうかわからないので)。排他制御を乱数生成の関数 myintrand() の中でやるようにすれば、呼び出し側は何も考えなくてよいので使いやすいが、将来一度に多数の乱数を発生させるなどの場合もありうるかも知れないので、その場合に効率よくなるような方法がとれるように排他制御を myintrand() の外に置いた。この辺は、もう少し考察して決めた方がよいだろう。

ソースコード

/* datsrv.c
 *
 * Windows (MinGW + pthreads for win32)
 *	gcc -DWIN32 datsrv.c -lpthreadGC2 -lws2_32
 *
 * Linux
 *	gcc datsrv.c -lpthread
 *
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#ifdef WIN32
#include <winsock2.h>
typedef int socklen_t;
#else
#include <unistd.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#endif	/* WIN32 */
#include <pthread.h>

#define SERVICE_PORT 8881
#define MAX_DATVAL 4095
#define BACKLOG 5

volatile int runflag;
const char* pname;

pthread_mutex_t randmutex = PTHREAD_MUTEX_INITIALIZER;

int startnetlib(void)
{
#ifdef WIN32
	WORD wVersion;
	WSADATA wsaData;
	wVersion = MAKEWORD(2,2);
	if(WSAStartup(wVersion, &wsaData) != 0)
		return 0;
#endif	/* WIN32 */
	return 1;
}

int stopnetlib(void)
{
#ifdef WIN32
	WSACleanup();
#endif	/* WIN32 */
	return 1;
}

#ifndef WIN32
int closesocket(int s)
{
	return close(s);
}
#endif	/* WIN32 */

static int myintrand(int n1, int n2)
{
	int j = (int)((((double)(n2 - n1) + 1.0)*rand())
	 	/ (RAND_MAX + 1.0)) + n1;
	return j;
}

void* data_service(void* arg)
{
	int* s;
	int t;
	int datval;
	char msg[32];

	s = (int*)arg;
	t = *s;		/* accepted socket */
	while(1)
	{
		pthread_mutex_lock(&randmutex);
		datval = myintrand(0,MAX_DATVAL);
		pthread_mutex_unlock(&randmutex);
		sprintf(msg, "%d\r\n\000", datval);
		if(send(t, msg, strlen(msg), 0) < 0)
			break;
	}
 	closesocket(t);
	return 0;
}

int main(int argc, char* argv[])
{
	int s;	/* server socket */
	int t;	/* accepted socket */
	struct sockaddr_in sa;	/* server address */
	struct sockaddr_in ca;	/* client address */
	socklen_t calen;	/* client address length */

	pthread_t thid;	/* thread identifier */
	pthread_attr_t thattr;	/* thread attributes for pthread_create */

	/* set the program name */
	pname = argv[0];
	/* set runflag */
	runflag = 1;
	/* Initialize random number generator */
	srand((int)clock());

	/* Initialize the thread attributes */
	if(pthread_attr_init(&thattr) != 0)
	{
		fprintf(stderr, "%s: Can't initialize the thread attributes\n", pname);
		return 0;
	}
	if(pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED) != 0)
	{
		fprintf(stderr, "%s: Can't set the thread attributes in DETACHED state\n", pname);
		return 0;
	}

	/* Start netlibrary (for WINSOCK, dummy in Linux case) */
	if(!startnetlib())
	{
		fprintf(stderr, "%s: can't start network library.\n", pname);
		return 0;
	}

	/* Create the socket */
	if((s = socket(AF_INET, SOCK_STREAM, 0)) == -1)
	{
		fprintf(stderr, "%s: Can't create stream socket\n", pname);
		return 0;
	}

	/* Bind the service address to the socket */
	memset((char*)&sa, 0, sizeof(sa));
	sa.sin_family = AF_INET;
	sa.sin_port = htons( SERVICE_PORT );
	sa.sin_addr.s_addr = htonl( INADDR_ANY );

	if(bind(s, (const struct sockaddr*)&sa, sizeof(sa)) == -1)
	{
		closesocket(s);
		fprintf(stderr, "%s: Can't bind local address\n", pname);
		stopnetlib();
		return 0;
	}

	/* Set the socket in listen state */
	if(listen(s, BACKLOG) == -1)
	{
		closesocket(s);
		fprintf(stderr, "%s: listen error\n", pname);
		stopnetlib();
		return 0;
	}

	/* Service loop. Do the service for each accepted socket. */
	while(runflag)
	{
		calen = sizeof(ca);
		if((t = accept(s, (struct sockaddr*)&ca, &calen)) == -1)
		{
			fprintf(stderr, "%s: accept error\n", pname);
			runflag = 0;
		}
		else
		{
			if(pthread_create(&thid, &thattr, data_service, &t) != 0)
			{
				fprintf(stderr, "%s: can't create thread\n", pname);
				closesocket(t);
			}
		}
	}
	closesocket(s);
	stopnetlib();
	return 0;
}

トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2008-07-10 (木) 10:21:36