/* * Library for Posix async read operations with hints. * Author: Don Capps * Company: Iozone * Date: 4/24/1998 * * Two models are supported. First model is a replacement for read() where the async * operations are performed and the requested data is bcopy()-ed back into the users * buffer. The second model is a new version of read() where the caller does not * supply the address of the buffer but instead is returned an address to the * location of the data. The second model eliminates a bcopy from the path. * * To use model #1: * 1. Call async_init(&pointer_on_stack,fd,direct_flag); * The fd is the file descriptor for the async operations. * The direct_flag sets VX_DIRECT * * 2. Call async_read(gc, fd, ubuffer, offset, size, stride, max, depth) * Where: * gc ............ is the pointer on the stack * fd ............ is the file descriptor * ubuffer ....... is the address of the user buffer. * offset ........ is the offset in the file to begin reading * size .......... is the size of the transfer. * stride ........ is the distance, in size units, to space the async reads. * max ........... is the max size of the file to be read. * depth ......... is the number of async operations to perform. * * 3. Call end_async(gc) when finished. * Where: * gc ............ is the pointer on the stack. * * To use model #2: * 1. Call async_init(&pointer_on_stack,fd,direct_flag); * The fd is the file descriptor for the async operations. * The direct_flag sets VX_DIRECT * 2. Call async_read(gc, fd, &ubuffer, offset, size, stride, max, depth) * Where: * gc ............ is the pointer on the stack * fd ............ is the file descriptor * ubuffer ....... is the address of a pointer that will be filled in * by the async library. * offset ........ is the offset in the file to begin reading * size .......... is the size of the transfer. * stride ........ is the distance, in size units, to space the async reads. * max ........... is the max size of the file to be read. * depth ......... is the number of async operations to perform. * * 3. Call async_release(gc) when finished with the data that was returned. * This allows the async library to reuse the memory that was filled in * and returned to the user. * * 4. Call end_async(gc) when finished. * Where: * gc ............ is the pointer on the stack. * * To use model #1: (WRITES) * 1. Call async_init(&pointer_on_stack,fd,direct_flag); * The fd is the file descriptor for the async operations. * * 2. Call async_write(gc, fd, ubuffer, size, offset, depth) * Where: * gc ............ is the pointer on the stack * fd ............ is the file descriptor * ubuffer ....... is the address of the user buffer. * size .......... is the size of the transfer. * offset ........ is the offset in the file to begin reading * depth ......... is the number of async operations to perform. * * 4. Call end_async(gc) when finished. * Where: * gc ............ is the pointer on the stack. * * Notes: * The intended use is to replace calls to read() with calls to * async_read() and allow the user to make suggestions on * what kind of async read-ahead would be nice to have. * The first transfer requested is guarenteed to be complete * before returning to the caller. The async operations will * be started and will also be guarenteed to have completed * if the next call specifies its first request to be one * that was previously performed with an async operation. * * The async_read_no_copy() function allows the async operations * to return the data to the user and not have to perform * a bcopy of the data back into the user specified buffer * location. This model is faster but assumes that the user * application has been modified to work with this model. * * The async_write() is intended to enhance the performance of * initial writes to a file. This is the slowest case in the write * path as it must perform meta-data allocations and wait. */ #include #include #if defined(solaris) || defined(linux) || defined(SCO_Unixware_gcc) #else #include #endif #include #include #ifndef bsd4_4 #include #endif #ifdef VXFS #include #endif #if defined(OSFV5) || defined(linux) #include #endif #if defined(linux) #include #include #include #endif #if (defined(solaris) && defined(__LP64__)) || defined(__s390x__) || defined(FreeBSD) /* If we are building for 64-bit Solaris, all functions that return pointers * must be declared before they are used; otherwise the compiler will assume * that they return ints and the top 32 bits of the pointer will be lost, * causing segmentation faults. The following includes take care of this. * It should be safe to add these for all other OSs too, but we're only * doing it for Solaris now in case another OS turns out to be a special case. */ #include #include #include /* For the BSD string functions */ #endif void mbcopy(char *source, char *dest, size_t len); #if !defined(solaris) && !defined(off64_t) && !defined(_OFF64_T) && !defined(__off64_t_defined) && !defined(SCO_Unixware_gcc) typedef long long off64_t; #endif #if defined(OSFV5) #include #endif extern long long page_size; extern int one; /* * Internal cache entrys. Each entry on the global * cache, pointed to by async_init(gc) will be of * this structure type. */ char version[] = "Libasync Version $Revision$"; struct cache_ent { struct aiocb myaiocb; /* For use in small file mode */ #ifdef _LARGEFILE64_SOURCE #if defined(__CrayX1__) aiocb64_t myaiocb64; /* For use in large file mode */ #else struct aiocb64 myaiocb64; /* For use in large file mode */ #endif #endif long long fd; /* File descriptor */ long long size; /* Size of the transfer */ struct cache_ent *forward; /* link to next element on cache list */ struct cache_ent *back; /* link to previous element on the cache list */ long long direct; /* flag to indicate if the buffer should be */ /* de-allocated by library */ char *real_address; /* Real address to free */ volatile void *oldbuf; /* Used for firewall to prevent in flight */ /* accidents */ int oldfd; /* Used for firewall to prevent in flight */ /* accidents */ size_t oldsize; /* Used for firewall to prevent in flight */ /* accidents */ }; /* * Head of the cache list */ struct cache { struct cache_ent *head; /* Head of cache list */ struct cache_ent *tail; /* tail of cache list */ struct cache_ent *inuse_head; /* head of in-use list */ long long count; /* How many elements on the cache list */ struct cache_ent *w_head; /* Head of cache list */ struct cache_ent *w_tail; /* tail of cache list */ long long w_count; /* How many elements on the write list */ }; long long max_depth; extern int errno; struct cache_ent *alloc_cache(); struct cache_ent *incache(); void async_init(); void end_async(); int async_suspend(); int async_read(); void takeoff_cache(); void del_cache(); void async_release(); void putoninuse(); void takeoffinuse(); struct cache_ent *allocate_write_buffer(); size_t async_write(); void async_wait_for_write(); void async_put_on_write_queue(); void async_write_finish(); /* On Solaris _LP64 will be defined by if we're compiling * as a 64-bit binary. Make sure that __LP64__ gets defined in this case, * too -- it should be defined on the compiler command line, but let's * not rely on this. */ #if defined(_LP64) #if !defined(__LP64__) #define __LP64__ #endif #endif /***********************************************/ /* Initialization routine to setup the library */ /***********************************************/ void async_init(gc,fd,flag) struct cache **gc; int fd; int flag; { #ifdef VXFS if(flag) ioctl(fd,VX_SETCACHE,VX_DIRECT); #endif if(*gc) { printf("Warning calling async_init two times ?\n"); return; } *gc=(struct cache *)malloc((size_t)sizeof(struct cache)); if(*gc == 0) { printf("Malloc failed\n"); exit(174); } bzero(*gc,sizeof(struct cache)); #if defined(__AIX__) || defined(SCO_Unixware_gcc) max_depth=500; #else max_depth=sysconf(_SC_AIO_MAX); #endif } /***********************************************/ /* Tear down routine to shutdown the library */ /***********************************************/ void end_async(gc) struct cache *gc; { del_cache(gc); async_write_finish(gc); free((void *)gc); } /***********************************************/ /* Wait for a request to finish */ /***********************************************/ int async_suspend(struct cache_ent *ce) { #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ const struct aiocb * const cblist[1] = {&ce->myaiocb}; #else const struct aiocb64 * const cblist[1] = {&ce->myaiocb64}; #endif #else const struct aiocb * const cblist[1] = {&ce->myaiocb}; #endif #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ return aio_suspend(cblist, 1, NULL); #else return aio_suspend64(cblist, 1, NULL); #endif #else return aio_suspend(cblist, 1, NULL); #endif } /************************************************************************* * This routine is a generic async reader assist funtion. It takes * the same calling parameters as read() but also extends the * interface to include: * stride ..... For the async reads, what is the distance, in size units, * to space the reads. Note: Stride of 0 indicates that * you do not want any read-ahead. * max ..... What is the maximum file offset for this operation. * depth ..... How much read-ahead do you want. * * The calls to this will guarentee to complete the read() operation * before returning to the caller. The completion may occur in two * ways. First the operation may be completed by calling aio_read() * and then waiting for it to complete. Second the operation may be * completed by copying the data from a cache of previously completed * async operations. * In the event the read to be satisfied is not in the cache then a * series of async operations will be scheduled and then the first * async read will be completed. In the event that the read() can be * satisfied from the cache then the data is copied back to the * user buffer and a series of async reads will be initiated. If a * read is issued and the cache contains data and the read can not * be satisfied from the cache, then the cache is discarded, and * a new cache is constructed. * Note: All operations are aio_read(). The series will be issued * as asyncs in the order requested. After all are in flight * then the code will wait for the manditory first read. *************************************************************************/ int async_read(gc, fd, ubuffer, offset, size, stride, max, depth) struct cache *gc; long long fd; char *ubuffer; off64_t offset; long long size; long long stride; off64_t max; long long depth; { off64_t a_offset,r_offset; long long a_size; struct cache_ent *ce,*first_ce=0; long long i; ssize_t retval=0; ssize_t ret; long long start = 0; long long del_read=0; a_offset=offset; a_size = size; /* * Check to see if it can be completed from the cache */ if((ce=(struct cache_ent *)incache(gc,fd,offset,size))) { #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ while((ret=aio_error(&ce->myaiocb))== EINPROGRESS) { async_suspend(ce); } #else while((ret=aio_error64(&ce->myaiocb64))== EINPROGRESS) { async_suspend(ce); } #endif #else while((ret=aio_error(&ce->myaiocb))== EINPROGRESS) { async_suspend(ce); } #endif if(ret) { printf("aio_error 1: ret %d %d\n",ret,errno); } #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ retval=aio_return(&ce->myaiocb); #else #if defined(__CrayX1__) retval=aio_return64((aiocb64_t *)&ce->myaiocb64); #else retval=aio_return64((struct aiocb64 *)&ce->myaiocb64); #endif #endif #else retval=aio_return(&ce->myaiocb); #endif if(retval > 0) { #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ mbcopy((char *)ce->myaiocb.aio_buf,(char *)ubuffer,(size_t)retval); #else mbcopy((char *)ce->myaiocb64.aio_buf,(char *)ubuffer,(size_t)retval); #endif #else mbcopy((char *)ce->myaiocb.aio_buf,(char *)ubuffer,(size_t)retval); #endif } #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ if(retval < ce->myaiocb.aio_nbytes) #else if(retval < ce->myaiocb64.aio_nbytes) #endif #else if(retval < ce->myaiocb.aio_nbytes) #endif { printf("aio_return error1: ret %d %d\n",retval,errno); #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ printf("aio_return error1: fd %d offset %ld buffer %lx size %d Opcode %d\n", ce->myaiocb.aio_fildes, ce->myaiocb.aio_offset, (long)(ce->myaiocb.aio_buf), ce->myaiocb.aio_nbytes, ce->myaiocb.aio_lio_opcode #else printf("aio_return error1: fd %d offset %lld buffer %lx size %d Opcode %d\n", ce->myaiocb64.aio_fildes, ce->myaiocb64.aio_offset, (long)(ce->myaiocb64.aio_buf), ce->myaiocb64.aio_nbytes, ce->myaiocb64.aio_lio_opcode #endif #else printf("aio_return error1: fd %d offset %d buffer %lx size %d Opcode %d\n", ce->myaiocb.aio_fildes, ce->myaiocb.aio_offset, (long)(ce->myaiocb.aio_buf), ce->myaiocb.aio_nbytes, ce->myaiocb.aio_lio_opcode #endif ); } ce->direct=0; takeoff_cache(gc,ce); }else { /* * Clear the cache and issue the first request async() */ del_cache(gc); del_read++; first_ce=alloc_cache(gc,fd,offset,size,(long long)LIO_READ); again: #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ ret=aio_read(&first_ce->myaiocb); #else ret=aio_read64(&first_ce->myaiocb64); #endif #else ret=aio_read(&first_ce->myaiocb); #endif if(ret!=0) { if(errno==EAGAIN) goto again; else printf("error returned from aio_read(). Ret %d errno %d\n",ret,errno); } } if(stride==0) /* User does not want read-ahead */ goto out; if(a_offset<0) /* Before beginning of file */ goto out; if(a_offset+size>max) /* After end of file */ goto out; if(depth >=(max_depth-1)) depth=max_depth-1; if(depth==0) goto out; if(gc->count > 1) start=depth-1; for(i=start;i max) continue; if((ce=incache(gc,fd,r_offset,a_size))) continue; ce=alloc_cache(gc,fd,r_offset,a_size,(long long)LIO_READ); #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ ret=aio_read(&ce->myaiocb); #else ret=aio_read64(&ce->myaiocb64); #endif #else ret=aio_read(&ce->myaiocb); #endif if(ret!=0) { takeoff_cache(gc,ce); break; } } out: if(del_read) /* Wait for the first read to complete */ { #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ while((ret=aio_error(&first_ce->myaiocb))== EINPROGRESS) { async_suspend(first_ce); } #else while((ret=aio_error64(&first_ce->myaiocb64))== EINPROGRESS) { async_suspend(first_ce); } #endif #else while((ret=aio_error(&first_ce->myaiocb))== EINPROGRESS) { async_suspend(first_ce); } #endif if(ret) printf("aio_error 2: ret %d %d\n",ret,errno); #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ retval=aio_return(&first_ce->myaiocb); #else retval=aio_return64(&first_ce->myaiocb64); #endif #else retval=aio_return(&first_ce->myaiocb); #endif #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ if(retval < first_ce->myaiocb.aio_nbytes) #else if(retval < first_ce->myaiocb64.aio_nbytes) #endif #else if(retval < first_ce->myaiocb.aio_nbytes) #endif { printf("aio_return error2: ret %d %d\n",retval,errno); #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ printf("aio_return error2: fd %d offset %lld buffer %lx size %d Opcode %d\n", first_ce->myaiocb.aio_fildes, first_ce->myaiocb.aio_offset, (long)(first_ce->myaiocb.aio_buf), first_ce->myaiocb.aio_nbytes, first_ce->myaiocb.aio_lio_opcode #else printf("aio_return error2: fd %d offset %lld buffer %lx size %d Opcode %d\n", first_ce->myaiocb64.aio_fildes, first_ce->myaiocb64.aio_offset, (long)(first_ce->myaiocb64.aio_buf), first_ce->myaiocb64.aio_nbytes, first_ce->myaiocb64.aio_lio_opcode #endif #else printf("aio_return error2: fd %d offset %d buffer %lx size %d Opcode %d\n", first_ce->myaiocb.aio_fildes, first_ce->myaiocb.aio_offset, (long)(first_ce->myaiocb.aio_buf), first_ce->myaiocb.aio_nbytes, first_ce->myaiocb.aio_lio_opcode #endif ); } if(retval > 0) { #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ mbcopy((char *)first_ce->myaiocb.aio_buf,(char *)ubuffer,(size_t)retval); #else mbcopy((char *)first_ce->myaiocb64.aio_buf,(char *)ubuffer,(size_t)retval); #endif #else mbcopy((char *)first_ce->myaiocb.aio_buf,(char *)ubuffer,(size_t)retval); #endif } first_ce->direct=0; takeoff_cache(gc,first_ce); } return((int)retval); } /************************************************************************ * This routine allocates a cache_entry. It contains the * aiocb block as well as linkage for use in the cache mechanism. * The space allocated here will be released after the cache entry * has been consumed. The routine takeoff_cache() will be called * after the data has been copied to user buffer or when the * cache is purged. The routine takeoff_cache() will also release * all memory associated with this cache entry. ************************************************************************/ struct cache_ent * alloc_cache(gc,fd,offset,size,op) struct cache *gc; long long fd,size,op; off64_t offset; { struct cache_ent *ce; long temp; ce=(struct cache_ent *)malloc((size_t)sizeof(struct cache_ent)); if(ce == (struct cache_ent *)0) { printf("Malloc failed\n"); exit(175); } bzero(ce,sizeof(struct cache_ent)); #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ ce->myaiocb.aio_fildes=(int)fd; ce->myaiocb.aio_offset=(off64_t)offset; ce->real_address = (char *)malloc((size_t)(size+page_size)); temp=(long)ce->real_address; temp = (temp+page_size) & ~(page_size-1); ce->myaiocb.aio_buf=(volatile void *)temp; if(ce->myaiocb.aio_buf == 0) #else ce->myaiocb64.aio_fildes=(int)fd; ce->myaiocb64.aio_offset=(off64_t)offset; ce->real_address = (char *)malloc((size_t)(size+page_size)); temp=(long)ce->real_address; temp = (temp+page_size) & ~(page_size-1); ce->myaiocb64.aio_buf=(volatile void *)temp; if(ce->myaiocb64.aio_buf == 0) #endif #else ce->myaiocb.aio_fildes=(int)fd; ce->myaiocb.aio_offset=(off_t)offset; ce->real_address = (char *)malloc((size_t)(size+page_size)); temp=(long)ce->real_address; temp = (temp+page_size) & ~(page_size-1); ce->myaiocb.aio_buf=(volatile void *)temp; if(ce->myaiocb.aio_buf == 0) #endif { printf("Malloc failed\n"); exit(176); } /*bzero(ce->myaiocb.aio_buf,(size_t)size);*/ #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ ce->myaiocb.aio_reqprio=0; ce->myaiocb.aio_nbytes=(size_t)size; ce->myaiocb.aio_sigevent.sigev_notify=SIGEV_NONE; ce->myaiocb.aio_lio_opcode=(int)op; #else ce->myaiocb64.aio_reqprio=0; ce->myaiocb64.aio_nbytes=(size_t)size; ce->myaiocb64.aio_sigevent.sigev_notify=SIGEV_NONE; ce->myaiocb64.aio_lio_opcode=(int)op; #endif #else ce->myaiocb.aio_reqprio=0; ce->myaiocb.aio_nbytes=(size_t)size; ce->myaiocb.aio_sigevent.sigev_notify=SIGEV_NONE; ce->myaiocb.aio_lio_opcode=(int)op; #endif ce->fd=(int)fd; ce->forward=0; ce->back=gc->tail; if(gc->tail) gc->tail->forward = ce; gc->tail= ce; if(!gc->head) gc->head=ce; gc->count++; return(ce); } /************************************************************************ * This routine checks to see if the requested data is in the * cache. *************************************************************************/ struct cache_ent * incache(gc,fd,offset,size) struct cache *gc; long long fd,size; off64_t offset; { struct cache_ent *move; if(gc->head==0) { return(0); } move=gc->head; #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ while(move) { if((move->fd == fd) && (move->myaiocb.aio_offset==(off64_t)offset) && ((size_t)size==move->myaiocb.aio_nbytes)) { return(move); } move=move->forward; } #else while(move) { if((move->fd == fd) && (move->myaiocb64.aio_offset==(off64_t)offset) && ((size_t)size==move->myaiocb64.aio_nbytes)) { return(move); } move=move->forward; } #endif #else while(move) { if((move->fd == fd) && (move->myaiocb.aio_offset==(off_t)offset) && ((size_t)size==move->myaiocb.aio_nbytes)) { return(move); } move=move->forward; } #endif return(0); } /************************************************************************ * This routine removes a specific cache entry from the cache, and * releases all memory associated witht the cache entry (if not direct). *************************************************************************/ void takeoff_cache(gc,ce) struct cache *gc; struct cache_ent *ce; { struct cache_ent *move; long long found; move=gc->head; if(move==ce) /* Head of list */ { gc->head=ce->forward; if(gc->head) gc->head->back=0; else gc->tail = 0; if(!ce->direct) { free((void *)(ce->real_address)); free((void *)ce); } gc->count--; return; } found=0; while(move) { if(move==ce) { if(move->forward) { move->forward->back=move->back; } if(move->back) { move->back->forward=move->forward; } found=1; break; } else { move=move->forward; } } if(gc->head == ce) gc->tail = ce; if(!found) printf("Internal Error in takeoff cache\n"); move=gc->head; if(!ce->direct) { free((void *)(ce->real_address)); free((void *)ce); } gc->count--; } /************************************************************************ * This routine is used to purge the entire cache. This is called when * the cache contains data but the incomming read was not able to * be satisfied from the cache. This indicates that the previous * async read-ahead was not correct and a new pattern is emerging. ************************************************************************/ void del_cache(gc) struct cache *gc; { struct cache_ent *ce; ssize_t ret; ce=gc->head; while(1) { ce=gc->head; if(ce==0) return; #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ while((ret = aio_cancel(0,&ce->myaiocb))==AIO_NOTCANCELED) #else while((ret = aio_cancel64(0,&ce->myaiocb64))==AIO_NOTCANCELED) #endif #else while((ret = aio_cancel(0,&ce->myaiocb))==AIO_NOTCANCELED) #endif ; #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ ret = aio_return(&ce->myaiocb); #else ret = aio_return64(&ce->myaiocb64); #endif #else ret = aio_return(&ce->myaiocb); #endif ce->direct=0; takeoff_cache(gc,ce); /* remove from cache */ } } /************************************************************************ * Like its sister async_read() this function performs async I/O for * all buffers but it differs in that it expects the caller to * request a pointer to the data to be returned instead of handing * the function a location to put the data. This will allow the * async I/O to be performed and does not require any bcopy to be * done to put the data back into the location specified by the caller. ************************************************************************/ int async_read_no_copy(gc, fd, ubuffer, offset, size, stride, max, depth) struct cache *gc; long long fd; char **ubuffer; off64_t offset; long long size; long long stride; off64_t max; long long depth; { off64_t a_offset,r_offset; long long a_size; struct cache_ent *ce,*first_ce=0; long long i; ssize_t retval=0; ssize_t ret; long long del_read=0; long long start=0; a_offset=offset; a_size = size; /* * Check to see if it can be completed from the cache */ if((ce=(struct cache_ent *)incache(gc,fd,offset,size))) { #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ while((ret=aio_error(&ce->myaiocb))== EINPROGRESS) { async_suspend(ce); } #else while((ret=aio_error64(&ce->myaiocb64))== EINPROGRESS) { async_suspend(ce); } #endif #else while((ret=aio_error(&ce->myaiocb))== EINPROGRESS) { async_suspend(ce); } #endif if(ret) printf("aio_error 3: ret %d %d\n",ret,errno); #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ if(ce->oldbuf != ce->myaiocb.aio_buf || ce->oldfd != ce->myaiocb.aio_fildes || ce->oldsize != ce->myaiocb.aio_nbytes) #else if(ce->oldbuf != ce->myaiocb64.aio_buf || ce->oldfd != ce->myaiocb64.aio_fildes || ce->oldsize != ce->myaiocb64.aio_nbytes) #endif #else if(ce->oldbuf != ce->myaiocb.aio_buf || ce->oldfd != ce->myaiocb.aio_fildes || ce->oldsize != ce->myaiocb.aio_nbytes) #endif printf("It changed in flight\n"); #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ retval=aio_return(&ce->myaiocb); #else retval=aio_return64(&ce->myaiocb64); #endif #else retval=aio_return(&ce->myaiocb); #endif if(retval > 0) { #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ *ubuffer=(char *)ce->myaiocb.aio_buf; #else *ubuffer=(char *)ce->myaiocb64.aio_buf; #endif #else *ubuffer=(char *)ce->myaiocb.aio_buf; #endif }else *ubuffer=0; #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ if(retval < ce->myaiocb.aio_nbytes) #else if(retval < ce->myaiocb64.aio_nbytes) #endif #else if(retval < ce->myaiocb.aio_nbytes) #endif { printf("aio_return error4: ret %d %d\n",retval,errno); #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ printf("aio_return error4: fd %d offset %lld buffer %lx size %d Opcode %d\n", ce->myaiocb.aio_fildes, ce->myaiocb.aio_offset, (long)(ce->myaiocb.aio_buf), ce->myaiocb.aio_nbytes, ce->myaiocb.aio_lio_opcode #else printf("aio_return error4: fd %d offset %lld buffer %lx size %d Opcode %d\n", ce->myaiocb64.aio_fildes, ce->myaiocb64.aio_offset, (long)(ce->myaiocb64.aio_buf), ce->myaiocb64.aio_nbytes, ce->myaiocb64.aio_lio_opcode #endif #else printf("aio_return error4: fd %d offset %d buffer %lx size %d Opcode %d\n", ce->myaiocb.aio_fildes, ce->myaiocb.aio_offset, (long)(ce->myaiocb.aio_buf), ce->myaiocb.aio_nbytes, ce->myaiocb.aio_lio_opcode #endif ); } ce->direct=1; takeoff_cache(gc,ce); /* do not delete buffer*/ putoninuse(gc,ce); }else { /* * Clear the cache and issue the first request async() */ del_cache(gc); del_read++; first_ce=alloc_cache(gc,fd,offset,size,(long long)LIO_READ); /* allocate buffer */ /*printf("allocated buffer/read %x offset %d\n",first_ce->myaiocb.aio_buf,offset);*/ again: #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ first_ce->oldbuf=first_ce->myaiocb.aio_buf; first_ce->oldfd=first_ce->myaiocb.aio_fildes; first_ce->oldsize=first_ce->myaiocb.aio_nbytes; ret=aio_read(&first_ce->myaiocb); #else first_ce->oldbuf=first_ce->myaiocb64.aio_buf; first_ce->oldfd=first_ce->myaiocb64.aio_fildes; first_ce->oldsize=first_ce->myaiocb64.aio_nbytes; ret=aio_read64(&first_ce->myaiocb64); #endif #else first_ce->oldbuf=first_ce->myaiocb.aio_buf; first_ce->oldfd=first_ce->myaiocb.aio_fildes; first_ce->oldsize=first_ce->myaiocb.aio_nbytes; ret=aio_read(&first_ce->myaiocb); #endif if(ret!=0) { if(errno==EAGAIN) goto again; else printf("error returned from aio_read(). Ret %d errno %d\n",ret,errno); } } if(stride==0) /* User does not want read-ahead */ goto out; if(a_offset<0) /* Before beginning of file */ goto out; if(a_offset+size>max) /* After end of file */ goto out; if(depth >=(max_depth-1)) depth=max_depth-1; if(depth==0) goto out; if(gc->count > 1) start=depth-1; for(i=start;i max) continue; if((ce=incache(gc,fd,r_offset,a_size))) continue; ce=alloc_cache(gc,fd,r_offset,a_size,(long long)LIO_READ); #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ ce->oldbuf=ce->myaiocb.aio_buf; ce->oldfd=ce->myaiocb.aio_fildes; ce->oldsize=ce->myaiocb.aio_nbytes; ret=aio_read(&ce->myaiocb); #else ce->oldbuf=ce->myaiocb64.aio_buf; ce->oldfd=ce->myaiocb64.aio_fildes; ce->oldsize=ce->myaiocb64.aio_nbytes; ret=aio_read64(&ce->myaiocb64); #endif #else ce->oldbuf=ce->myaiocb.aio_buf; ce->oldfd=ce->myaiocb.aio_fildes; ce->oldsize=ce->myaiocb.aio_nbytes; ret=aio_read(&ce->myaiocb); #endif if(ret!=0) { takeoff_cache(gc,ce); break; } } out: if(del_read) /* Wait for the first read to complete */ { #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ while((ret=aio_error(&first_ce->myaiocb))== EINPROGRESS) { async_suspend(first_ce); } #else while((ret=aio_error64(&first_ce->myaiocb64))== EINPROGRESS) { async_suspend(first_ce); } #endif #else while((ret=aio_error(&first_ce->myaiocb))== EINPROGRESS) { async_suspend(first_ce); } #endif if(ret) printf("aio_error 4: ret %d %d\n",ret,errno); #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ if(first_ce->oldbuf != first_ce->myaiocb.aio_buf || first_ce->oldfd != first_ce->myaiocb.aio_fildes || first_ce->oldsize != first_ce->myaiocb.aio_nbytes) printf("It changed in flight2\n"); retval=aio_return(&first_ce->myaiocb); #else if(first_ce->oldbuf != first_ce->myaiocb64.aio_buf || first_ce->oldfd != first_ce->myaiocb64.aio_fildes || first_ce->oldsize != first_ce->myaiocb64.aio_nbytes) printf("It changed in flight2\n"); retval=aio_return64(&first_ce->myaiocb64); #endif #else if(first_ce->oldbuf != first_ce->myaiocb.aio_buf || first_ce->oldfd != first_ce->myaiocb.aio_fildes || first_ce->oldsize != first_ce->myaiocb.aio_nbytes) printf("It changed in flight2\n"); retval=aio_return(&first_ce->myaiocb); #endif #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ if(retval < first_ce->myaiocb.aio_nbytes) #else if(retval < first_ce->myaiocb64.aio_nbytes) #endif #else if(retval < first_ce->myaiocb.aio_nbytes) #endif { printf("aio_return error5: ret %d %d\n",retval,errno); #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ printf("aio_return error5: fd %d offset %lld buffer %lx size %d Opcode %d\n", first_ce->myaiocb.aio_fildes, first_ce->myaiocb.aio_offset, (long)(first_ce->myaiocb.aio_buf), first_ce->myaiocb.aio_nbytes, first_ce->myaiocb.aio_lio_opcode #else printf("aio_return error5: fd %d offset %lld buffer %lx size %d Opcode %d\n", first_ce->myaiocb64.aio_fildes, first_ce->myaiocb64.aio_offset, (long)(first_ce->myaiocb64.aio_buf), first_ce->myaiocb64.aio_nbytes, first_ce->myaiocb64.aio_lio_opcode #endif #else printf("aio_return error5: fd %d offset %ld buffer %lx size %d Opcode %d\n", first_ce->myaiocb.aio_fildes, first_ce->myaiocb.aio_offset, (long)(first_ce->myaiocb.aio_buf), first_ce->myaiocb.aio_nbytes, first_ce->myaiocb.aio_lio_opcode #endif ); } if(retval > 0) { #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ *ubuffer=(char *)first_ce->myaiocb.aio_buf; #else *ubuffer=(char *)first_ce->myaiocb64.aio_buf; #endif #else *ubuffer=(char *)first_ce->myaiocb.aio_buf; #endif }else *ubuffer=(char *)0; first_ce->direct=1; /* do not delete the buffer */ takeoff_cache(gc,first_ce); putoninuse(gc,first_ce); } return((int)retval); } /************************************************************************ * The caller is now finished with the data that was provided so * the library is now free to return the memory to the pool for later * reuse. ************************************************************************/ void async_release(gc) struct cache *gc; { takeoffinuse(gc); } /************************************************************************ * Put the buffer on the inuse list. When the user is finished with * the buffer it will call back into async_release and the items on the * inuse list will be deallocated. ************************************************************************/ void putoninuse(gc,entry) struct cache *gc; struct cache_ent *entry; { if(gc->inuse_head) entry->forward=gc->inuse_head; else entry->forward=0; gc->inuse_head=entry; } /************************************************************************ * This is called when the application is finished with the data that * was provided. The memory may now be returned to the pool. ************************************************************************/ void takeoffinuse(gc) struct cache *gc; { struct cache_ent *ce; if(gc->inuse_head==0) printf("Takeoffinuse error\n"); ce=gc->inuse_head; gc->inuse_head=gc->inuse_head->forward; if(gc->inuse_head !=0) printf("Error in take off inuse\n"); free((void*)(ce->real_address)); free(ce); } /************************************************************************* * This routine is a generic async writer assist funtion. It takes * the same calling parameters as write() but also extends the * interface to include: * * offset ..... offset in the file. * depth ..... How much read-ahead do you want. * *************************************************************************/ size_t async_write(gc,fd,buffer,size,offset,depth) struct cache *gc; long long fd,size; char *buffer; off64_t offset; long long depth; { struct cache_ent *ce; size_t ret; ce=allocate_write_buffer(gc,fd,offset,size,(long long)LIO_WRITE,depth,0LL,(char *)0,(char *)0); ce->direct=0; /* not direct. Lib supplies buffer and must free it */ #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ mbcopy(buffer,(char *)(ce->myaiocb.aio_buf),(size_t)size); #else mbcopy(buffer,(char *)(ce->myaiocb64.aio_buf),(size_t)size); #endif #else mbcopy(buffer,(char *)(ce->myaiocb.aio_buf),(size_t)size); #endif async_put_on_write_queue(gc,ce); /* printf("asw: fd %d offset %lld, size %d\n",ce->myaiocb64.aio_fildes, ce->myaiocb64.aio_offset, ce->myaiocb64.aio_nbytes); */ again: #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ ret=aio_write(&ce->myaiocb); #else ret=aio_write64(&ce->myaiocb64); #endif #else ret=aio_write(&ce->myaiocb); #endif if(ret==-1) { if(errno==EAGAIN) { async_wait_for_write(gc); goto again; } if(errno==0) { /* Compensate for bug in async library */ async_wait_for_write(gc); goto again; } else { printf("Error in aio_write: ret %d errno %d count %lld\n",ret,errno,gc->w_count); /* printf("aio_write_no_copy: fd %d buffer %x offset %lld size %d\n", ce->myaiocb64.aio_fildes, ce->myaiocb64.aio_buf, ce->myaiocb64.aio_offset, ce->myaiocb64.aio_nbytes); */ exit(177); } } return((ssize_t)size); } /************************************************************************* * Allocate a write aiocb and write buffer of the size specified. Also * put some extra buffer padding so that VX_DIRECT can do its job when * needed. *************************************************************************/ struct cache_ent * allocate_write_buffer(gc,fd,offset,size,op,w_depth,direct,buffer,free_addr) struct cache *gc; long long fd,size,op; off64_t offset; long long w_depth; long long direct; char *buffer,*free_addr; { struct cache_ent *ce; long temp; if(fd==0LL) { printf("Setting up write buffer insane\n"); exit(178); } if(gc->w_count > w_depth) async_wait_for_write(gc); ce=(struct cache_ent *)malloc((size_t)sizeof(struct cache_ent)); if(ce == (struct cache_ent *)0) { printf("Malloc failed 1\n"); exit(179); } bzero(ce,sizeof(struct cache_ent)); #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ ce->myaiocb.aio_fildes=(int)fd; ce->myaiocb.aio_offset=(off64_t)offset; if(!direct) { ce->real_address = (char *)malloc((size_t)(size+page_size)); temp=(long)ce->real_address; temp = (temp+page_size) & ~(page_size-1); ce->myaiocb.aio_buf=(volatile void *)temp; }else { ce->myaiocb.aio_buf=(volatile void *)buffer; ce->real_address=(char *)free_addr; } if(ce->myaiocb.aio_buf == 0) #else ce->myaiocb64.aio_fildes=(int)fd; ce->myaiocb64.aio_offset=(off64_t)offset; if(!direct) { ce->real_address = (char *)malloc((size_t)(size+page_size)); temp=(long)ce->real_address; temp = (temp+page_size) & ~(page_size-1); ce->myaiocb64.aio_buf=(volatile void *)temp; } else { ce->myaiocb64.aio_buf=(volatile void *)buffer; ce->real_address=(char *)free_addr; } if(ce->myaiocb64.aio_buf == 0) #endif #else ce->myaiocb.aio_fildes=(int)fd; ce->myaiocb.aio_offset=(off_t)offset; if(!direct) { ce->real_address = (char *)malloc((size_t)(size+page_size)); temp=(long)ce->real_address; temp = (temp+page_size) & ~(page_size-1); ce->myaiocb.aio_buf=(volatile void *)temp; } else { ce->myaiocb.aio_buf=(volatile void *)buffer; ce->real_address=(char *)free_addr; } if(ce->myaiocb.aio_buf == 0) #endif { printf("Malloc failed 2\n"); exit(180); } #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ ce->myaiocb.aio_reqprio=0; ce->myaiocb.aio_nbytes=(size_t)size; ce->myaiocb.aio_sigevent.sigev_notify=SIGEV_NONE; ce->myaiocb.aio_lio_opcode=(int)op; #else ce->myaiocb64.aio_reqprio=0; ce->myaiocb64.aio_nbytes=(size_t)size; ce->myaiocb64.aio_sigevent.sigev_notify=SIGEV_NONE; ce->myaiocb64.aio_lio_opcode=(int)op; #endif #else ce->myaiocb.aio_reqprio=0; ce->myaiocb.aio_nbytes=(size_t)size; ce->myaiocb.aio_sigevent.sigev_notify=SIGEV_NONE; ce->myaiocb.aio_lio_opcode=(int)op; #endif ce->fd=(int)fd; return(ce); } /************************************************************************* * Put it on the outbound queue. *************************************************************************/ void async_put_on_write_queue(gc,ce) struct cache *gc; struct cache_ent *ce; { ce->forward=0; ce->back=gc->w_tail; if(gc->w_tail) gc->w_tail->forward = ce; gc->w_tail= ce; if(!gc->w_head) gc->w_head=ce; gc->w_count++; return; } /************************************************************************* * Cleanup all outstanding writes *************************************************************************/ void async_write_finish(gc) struct cache *gc; { while(gc->w_head) { /*printf("async_write_finish: Waiting for buffer %x to finish\n",gc->w_head->myaiocb64.aio_buf);*/ async_wait_for_write(gc); } } /************************************************************************* * Wait for an I/O to finish *************************************************************************/ void async_wait_for_write(gc) struct cache *gc; { struct cache_ent *ce; size_t ret,retval; if(gc->w_head==0) return; ce=gc->w_head; gc->w_head=ce->forward; gc->w_count--; ce->forward=0; if(ce==gc->w_tail) gc->w_tail=0; /*printf("Wait for buffer %x offset %lld size %d to finish\n", ce->myaiocb64.aio_buf, ce->myaiocb64.aio_offset, ce->myaiocb64.aio_nbytes); printf("write count %lld \n",gc->w_count); */ #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ while((ret=aio_error(&ce->myaiocb))== EINPROGRESS) { async_suspend(ce); } #else while((ret=aio_error64(&ce->myaiocb64))== EINPROGRESS) { async_suspend(ce); } #endif #else while((ret=aio_error(&ce->myaiocb))== EINPROGRESS) { async_suspend(ce); } #endif if(ret) { printf("aio_error 5: ret %d %d\n",ret,errno); #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ printf("fd %d offset %lld size %d\n", ce->myaiocb.aio_fildes, ce->myaiocb.aio_offset, ce->myaiocb.aio_nbytes); #else printf("fd %d offset %lld size %d\n", ce->myaiocb64.aio_fildes, ce->myaiocb64.aio_offset, ce->myaiocb64.aio_nbytes); #endif #else printf("fd %d offset %lld size %d\n", ce->myaiocb.aio_fildes, ce->myaiocb.aio_offset, ce->myaiocb.aio_nbytes); #endif exit(181); } #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ retval=aio_return(&ce->myaiocb); #else #if defined(__CrayX1__) retval=aio_return64((aiocb64_t *)&ce->myaiocb64); #else retval=aio_return64((struct aiocb64 *)&ce->myaiocb64); #endif #endif #else retval=aio_return(&ce->myaiocb); #endif if((int)retval < 0) { printf("aio_return error: %d\n",errno); } if(!ce->direct) { /* printf("Freeing buffer %x\n",ce->real_address);*/ free((void *)(ce->real_address)); free((void *)ce); } } /************************************************************************* * This routine is a generic async writer assist funtion. It takes * the same calling parameters as write() but also extends the * interface to include: * * offset ..... offset in the file. * depth ..... How much read-ahead do you want. * free_addr .. address of memory to free after write is completed. * *************************************************************************/ size_t async_write_no_copy(gc,fd,buffer,size,offset,depth,free_addr) struct cache *gc; long long fd,size; char *buffer; off64_t offset; long long depth; char *free_addr; { struct cache_ent *ce; size_t ret; long long direct = 1; ce=allocate_write_buffer(gc,fd,offset,size,(long long)LIO_WRITE,depth,direct,buffer,free_addr); ce->direct=0; /* have library de-allocate the buffer */ async_put_on_write_queue(gc,ce); /* printf("awnc: fd %d offset %lld, size %d\n",ce->myaiocb64.aio_fildes, ce->myaiocb64.aio_offset, ce->myaiocb64.aio_nbytes); */ again: #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ ret=aio_write(&ce->myaiocb); #else ret=aio_write64(&ce->myaiocb64); #endif #else ret=aio_write(&ce->myaiocb); #endif if(ret==-1) { if(errno==EAGAIN) { async_wait_for_write(gc); goto again; } if(errno==0) { /* Compensate for bug in async library */ async_wait_for_write(gc); goto again; } else { printf("Error in aio_write: ret %d errno %d\n",ret,errno); #ifdef _LARGEFILE64_SOURCE #ifdef __LP64__ printf("aio_write_no_copy: fd %d buffer %lx offset %lld size %d\n", ce->myaiocb.aio_fildes, (long)(ce->myaiocb.aio_buf), ce->myaiocb.aio_offset, ce->myaiocb.aio_nbytes); #else printf("aio_write_no_copy: fd %d buffer %lx offset %lld size %d\n", ce->myaiocb64.aio_fildes, (long)(ce->myaiocb64.aio_buf), ce->myaiocb64.aio_offset, ce->myaiocb64.aio_nbytes); #endif #else printf("aio_write_no_copy: fd %d buffer %lx offset %ld size %d\n", ce->myaiocb.aio_fildes, (long)(ce->myaiocb.aio_buf), ce->myaiocb.aio_offset, ce->myaiocb.aio_nbytes); #endif exit(182); } } else { return((ssize_t)size); } } void mbcopy(source, dest, len) char *source,*dest; size_t len; { int i; for(i=0;i