#!/usr/local/bin/pike // Copyright Fredrik Hübinette, 1998 // Distribution policy: GPL // TODO list: // make --ssh-proxy use ssh-proxy: when possible // make ssh-proxy: fallback on --ssh-proxy // implement --ssh-proxy for FTPFS // use alarm() more to make sure we don't get stuck anywhere constant ID="$Id: pcp,v 1.24 1999/05/26 20:46:42 hubbe Exp $"; #define PROTOCOL_VERSION "0001" // How much is read/written at once int BLOCK_SIZE=2048; // How many seconds should be counted into the 'current bps' float BPS_SECONDS=60.0; // This gives more debug messages int debug_level; // No status messages int quiet; // Perform verification int verify; // Recurse into subdirectories int recurse=1; // Max bps transferred over network float read_speed=1000000000.0; // Array of things *not* to copy array(Regexp) except=({}); // Optimize host to host transfers? int optimize=1; // Arguments given to this command string *args=({"pcp"}); // Don't expand globs int no_expand; // Compression? int compression=-1; // Total size of files to transfer float total_size=-1.0; // Total bytes accounted for int total_bytes=0; // Hosts to redirect through string *ssh_proxies=({}); // Use high TCP priority? int interactive=0; // What user to connect as string default_user; // What passwd to use string default_passwd; #define error(X) throw( ({ X, backtrace() }) ) #define retry() error("retry") #define is_retry(X) (arrayp(X) && sizeof(X) && X[0]=="retry") Process.create_process create_process(array(string) args, mapping options) { object o; mixed err; do { err=catch { o=Process.create_process(args,options); }; } while(err && reverse(err[0])[..3]=="\n11:"); return o; } class Summary { int bytes; int files; int dirs; void create(int b,int f, int d) { bytes=b; files=f; dirs=d; } Summary `+(Summary x) { return Summary(bytes+x->bytes, files+x->files, dirs+x->dirs); } }; class Codec { object objectof(string s) { sscanf(s,"%s(%s)",string ob, string args); switch(ob) { case "Summary": { sscanf(args,"%d,%d,%d",int b, int f, int d); return Summary(b,f,d); } } // throw(({"Cannot decode that!\n",backtrace()})); } string nameof(mixed x) { if(object_program(x)==Summary) return sprintf("Summary(%d,%d,%d)",x->bytes,x->files,x->dirs); // throw(({"Cannot encode that!\n",backtrace()})); } }; class VFileSystem { string query_fs_type(); string mkurl(string path); Stdio.File open(string file, string mode, void|int pos); int file_size(string file, void|int lnk); int cksum(string file); int rm(string file); array(string) get_dir(string dir); int mkdir(string dir); int chmod(string file, int mode) { return 0; }; string readlink(string path) { return 0; }; int symlink(string path, string to) { return 0; }; int file_mode(string file) { return -1; } string combine_path(string dir, string file) { while(dir[strlen(dir)-3..]=="//") dir=dir[strlen(dir)-2..]; if(!dir || dir=="" || dir=="." || dir=="./") { if(!file || file=="" || file=="." || file=="./") return "."; else return file; } if(!file || file=="" || file=="." || file=="./") return dir; if(dir[-1]=='/') return dir+file; return dir+"/"+file; // return predef::combine_path(dir,file); } string basename(string path) { if(path[-1]=='/') path=path[..sizeof(path)-2]; return (path/"/")[-1]; } string dirname(string path) { if(path=="" || path=="." || path=="/") return 0; if(path[-1]=='/') path=path[..sizeof(path)-2]; array(string) tmp=path/"/"; if(sizeof(tmp)==2 && !strlen(tmp[0])) return "/"; return tmp[..sizeof(tmp)-2]*"/"; } int verify(string file) { return -1; } Summary recursive_sizeof(string path) { status_line("Counting %s",path); int size=this_object()->file_size(path); switch(size) { case 0..: return Summary(size,1,0); default: return Summary(0,0,0); case -2: Summary tmp=Summary(0,0,0); foreach(this_object()->get_dir(path)||({}),string file) tmp+=recursive_sizeof(combine_path(path,file)); return tmp; } } string *low_expand(string *path_elements, int pos) { for(;posmkurl(path_elements[..pos]*"/")); if(base=="") base="."; debug(3,"low_expand: isdir(%s)\n",base); if(this_object()->file_size(base)!=-2) return ({}); debug(3,"low_expand: get_dir(%s)\n",base); for(int retries=0;retries<20;retries++) { if(tmp=this_object()->get_dir(base)) break; } if(!tmp) return ({}); string save=path_elements[pos]; debug(1,"Globbing %O against '%s'\n",tmp,save); foreach(glob(save,tmp),string tmp2) { path_elements[pos]=tmp2; tmp=low_expand(path_elements,pos+1); path_elements[pos]=save; if(!tmp) return 0; ret+=tmp; } return ret; } } debug(3,"low_expand -> %s\n",path_elements*"/"); return ({path_elements*"/"}); } string *expand(string *paths) { if(no_expand) return paths; string *ret=({}); debug(2,"Expanding %O\n",paths); foreach(paths, string path) { string *tmp=low_expand(path/"/",0); if(!tmp) { debug(2,"Expand failed.\n"); return 0; } ret+=tmp; } debug(2,"Expanded to: %O\n",ret); status_line(""); status_line(""); return ret; } array map(array foo, string func,mixed ... data) { return Array.map(foo,`->(this_object(),func),@data); } } array(int|string) decode_host(string host, string user, string pw, int port, int bufsize) { if(sscanf(reverse(host),"%s@%s",host,user)) { user=reverse(user); host=reverse(host); if(!sscanf(user,"%s:%s",user,pw)) if(default_passwd) pw=default_passwd; }else{ if(default_user) user=default_user; if(default_passwd) pw=default_passwd; } sscanf(host,"%s:%d",host,port); sscanf(host,"%s[%d]",host,bufsize); return ({ user, pw, host, port, bufsize }); } class CacheVFileSystem { inherit VFileSystem; mapping(string:int) flcache=([]); mapping(string:string) lnk=([]); mapping(string:int) modecache=([]); static string get_end_of_ls_line(string line) { string *elems=line/" "; int count=0; if(line[0]=='c') count--; for(int e=0;e %s",f,string link)) { debug(2,"\nLINK[%s%s]=%s\n",dir,f,link); lnk[dir+f]=link; } break; default: continue; case '-': size=(int)x[4]; } int mode=parse_mode(x[0]); ret+=({f}); debug(2,"\nFLCACHE[%s%s]=%d\n",dir,f,size); flcache[dir+f]=size; modecache[dir+f]=mode; if(size==-2) { debug(2,"\nFLCACHE[%s%s/]=%d\n",dir,f,size); flcache[dir+f+"/"]=size; modecache[dir+f+"/"]=mode; } } return sort(ret); } string readlink(string file) { if(file_size(file,1)==-3) return lnk[file]; } } // Internal stuff for add_sent int bytes; float bps=1.0; // Acceptable guess int base_time=time(); #if 0 class TimeStamp { int bytes; float time; }; array(TimeStamp) timestamps=allocate(1000); int first_timestamp; int last_timestamp; int add_sent_bytes; float first_bps_time=-1.0; float last_block_size_change=0.0; void add_sent(int|string b, void|function updater) { float current_time; if(first_bps_time==-1) { last_block_size_change=first_bps_time=time(base_time); return; } while(1) { if(stringp(b)) b=strlen(b); bytes+=b; current_time=time(base_time); while(first_timestamp < last_timestamp && current_time - timestamps[first_timestamp]->time > BPS_SECONDS) { add_sent_bytes-=timestamps[first_timestamp]->bytes; timestamps[first_timestamp]=0; first_timestamp++; } if(last_timestamp==sizeof(timestamps)) { timestamps=timestamps[first_timestamp..]+allocate(last_timestamp-first_timestamp); last_timestamp-=first_timestamp; first_timestamp=0; } timestamps[last_timestamp]=TimeStamp(); timestamps[last_timestamp]->time=current_time; timestamps[last_timestamp]->bytes=b; last_timestamp++; add_sent_bytes+=b; if(first_timestamp < last_timestamp) { float t=current_time - timestamps[first_timestamp]->time; int byt=add_sent_bytes-timestamps[first_timestamp]->bytes; debug(3,"TIMESTAMPS: %d dt %f db %d\n", last_timestamp-first_timestamp, t, byt); if(t>0.0) { bps=byt/t; }else{ debug(2,"No delta time2\n"); bps=0.0; } if(bps>read_speed) { float sleep_time=b/read_speed-t; debug(3,"\nSleeping %f seconds.\n",sleep_time); sleep(sleep_time); b=0; continue; } }else{ debug(2,"No delta time\n"); bps=0.0; } break; } if(current_time-last_block_size_change > BPS_SECONDS) { if(10*BLOCK_SIZE128 && BLOCK_SIZE>bps) { BLOCK_SIZE/=2; last_block_size_change=current_time; debug(1,"BLOCK_SIZE=%d\n",BLOCK_SIZE); } } } } #else float last_bps_time=-1.0; float first_bps_time=-1.0; float last_block_size_change=0.0; // Routines to keep track of how much data is sent over the network void add_sent(int|string b, void|function updater) { if(first_bps_time==-1) { last_block_size_change=first_bps_time=last_bps_time=time(base_time); return; } if(stringp(b)) b=strlen(b); float t=time(base_time); float bpssec=max(0.0,min(BPS_SECONDS, (t-first_bps_time)/3.0))+1.0; float bps_frac=1.0-1.0/bpssec; bytes+=b; debug(3,"\nBPS=%f %f %f %f %d",bps,bps_frac,t-last_bps_time,t,b); bps = bps * pow(bps_frac,(float)(t-last_bps_time)) + b/bpssec; debug(3," BPS=%f\n",bps); last_bps_time=t; if(bps > read_speed) { float wakeup_time=log(read_speed/bps) / log(bps_frac); debug(3,"\nSleeping %f seconds.\n",wakeup_time); if(updater) { wakeup_time+=t; while(1) { float f=wakeup_time - time(base_time); if(f<1.1) { sleep(f); break; }else{ sleep(1.0); updater(); } } while(time(base_time) < wakeup_time) { sleep( min(1.0, wakeup_time - t)); updater(); } }else{ sleep(wakeup_time); } t=time(base_time); float bpssec=max(0.0,min(BPS_SECONDS, (t-first_bps_time)/3.0))+1.0; bps = bps * pow(1.0-1.0/bpssec,(float)(t-last_bps_time)); last_bps_time=t; } if(t-last_block_size_change > BPS_SECONDS/2.0) { if(10*BLOCK_SIZE128 && BLOCK_SIZE>bps) { BLOCK_SIZE/=2; last_block_size_change=t; debug(1,"BLOCK_SIZE=%d\n",BLOCK_SIZE); } } } } #endif class Bps { float t=time(base_time); int b=bytes; float bps() { catch { return (bytes-b) / (time(base_time)-t); }; return 0.0; } } // Internal variable, tells weather we need to output a newline // before proceeding or not. int need_newline; int last_status_line_length; void output(string fmt, mixed ... args) { string s=sprintf(fmt,@args); switch(s[0]) { case '\r': break; case '\n': if(!need_newline) s=s[1..]; break; default: } werror(s); if(!(need_newline=s[-1]!='\n')) last_status_line_length=0; } void debug(int level, string fmt, mixed ... args) { if(level <= debug_level) output(fmt,@args); } void status(string fmt, mixed ... args) { if(!quiet) output(fmt,@args); } void status_line(string fmt, mixed ... args) { if(!quiet) { array(string) lines=sprintf(fmt, @args)/"\n"; if(strlen(lines[-1])>78) { lines[-1]="<<"+lines[-1][strlen(lines[-1])-76..]; } int l=strlen(lines[-1]); if(last_status_line_length>strlen(lines[0])) lines[0]+=" "*(last_status_line_length-strlen(lines[0])); output("\r"+lines*"\n"); last_status_line_length=l; } } void erase_status_line() { status_line(""); status_line(""); } string sh_quote(string s) { // werror("QUOTING "+s+"\n"); string ret=""; while(sscanf(s,"%[-a-zA-Z0-9_@./]%c%s",string ok, int c, s)==3) ret+=sprintf("%s\\%c",ok,c); // werror("QUOTED: "+ret+s+"\n"); return ret+s; } class ToDo { VFileSystem from_fs; string path; Summary summary; void create(VFileSystem fs, string f, Summary s) { from_fs=fs; path=f; summary=s; } }; class CopyThread { class RemoteFS { inherit CacheVFileSystem; string host; string user; array(string) local_ssh_proxies=ssh_proxies; string query_fs_type() { return "ssh"; } string mkurl(string path) { string h=host; if(user) h=user+"@"+host; return "ssh://"+h+"/"+path; } class Remote { string file; int pos; inherit Stdio.File : data; object proc; int waitforproc=1; string my_quote_sh_cmd(string s) { return replace(s, ({"(",")","`","\"","\\","'","$","&"}), ({"\\(","\\)","\\`","\\\"","\\\\","\\'","\\$","\\&"})); } void create(string cmd) { Stdio.File x=pipe(Stdio.PROP_BIDIRECTIONAL); string *ssh_cmd=({"ssh"}); switch(compression) { case 0: ssh_cmd+=({"-o","Compression no"}); break; default: ssh_cmd+=({"-C","-o","CompressionLevel "+compression}); case -1: } debug(2,"local proxies: %O\n",local_ssh_proxies); foreach(local_ssh_proxies,string host) { cmd=my_quote_sh_cmd(cmd); if(!interactive) ssh_cmd+=({"-x"}); if(debug_level>2) ssh_cmd+=({"-v"}); ssh_cmd+=({host,"ssh"}); } if(user) ssh_cmd+=({"-l",user}); if(debug_level>2) ssh_cmd+=({"-v"}); if(!interactive) ssh_cmd+=({"-x"}); // ssh_cmd+=({"-a"}); debug(2,"Remote: %O\n",ssh_cmd+({host,cmd})); proc=create_process(ssh_cmd+({host,cmd}), (["stdin":x,"stdout":x])); // add_sent(1024); } string read(void|int x, void|int y) { string ret; switch(query_num_arg()) { case 2: ret=data::read(x,y); break; case 1: ret=data::read(x); break; case 0: ret=data::read(); } pos+=strlen(ret||""); waitforproc=0; return ret; } int write(string s) { waitforproc=1; int ret=data::write(s); m_delete(flcache,file); return ret; } int tell() { return pos; } int wait() { return proc->wait(); } int status() { return proc->status(); } void destroy() { if(_fd) close(); if(proc && waitforproc) { // kill(proc,signum("SIGTERM")); proc->wait(); } } } Stdio.File open(string file, string mode, void|int pos) { switch(mode) { case "r": Remote data=Remote("dd " "2>/dev/null "+ " ibs="+BLOCK_SIZE+ " obs="+BLOCK_SIZE+ " skip="+(pos/BLOCK_SIZE)+ " if="+sh_quote(file)); data->pos=pos - pos%BLOCK_SIZE; data->file=file; return data; case "wct": case "wtc": case "twc": case "tcw": case "cwt": case "ctw": if(pos) return 0; if(!rm(file)) return 0; case "cwa": case "caw": case "wca": case "acw": case "wac": case "awc": Remote data=Remote("/bin/cat >>"+sh_quote(file)); data->pos=file_size(file); data->file=file; return data; } } string safe_remote_popen(string cmd) { while(1) { object o=Remote("echo Connected;"+cmd+";echo Disconnected"); if(string tmp=o->read()) { tmp-="\r"; if(tmp[..9]=="Connected\n" && tmp[strlen(tmp)-13..]=="Disconnected\n") return tmp[10..strlen(tmp)-14]; ; sleep(10); } } } int safe_remote_system(string cmd) { while(1) { object o=Remote("echo Connected;"+cmd); if(string tmp=o->read()) { tmp-="\r"; if(tmp[..9]=="Connected\n") return o->wait(); sleep(10); } } } string do_ls(string where,void|string xflags) { return safe_remote_popen("/bin/ls -l"+(xflags||"")+" "+sh_quote(where)); } array(string) get_dir(string dir) { string tmp=do_ls((dir&&strlen(dir))?dir:"."); // add_sent(tmp); return parse_directory(dir,tmp); } int low_file_size(string file) { if(file=="") file="."; if(!zero_type(flcache[file])) return flcache[file]; if(string dir=dirname(file)) { get_dir(dir); }else{ parse_directory("",do_ls(file,"d"),1); } if(!zero_type(flcache[file])) return flcache[file]; return -1; } int file_size(string file, void|int lnk) { int size=low_file_size(file); if(lnk) return size; switch(size) { /* Link or special file */ case -3: case -4: string tmp=safe_remote_popen("wc -c "+sh_quote(file)); if(sscanf(tmp,"%d ",size)) return flcache[file]=size; default: return size; } } int cksum(string file) { string tmp=safe_remote_popen("cksum "+sh_quote(file)); // add_sent(tmp); return ((int)tmp) || 1; } int rm(string file) { return !safe_remote_system("rm "+sh_quote(file)); } int mkdir(string file) { return !safe_remote_system("mkdir "+sh_quote(file)); } int verify(string path) { if(sscanf(reverse(path),"%s.",string ext)) { switch(reverse(ext)) { case "gz": case "tgz": case "Z": return !safe_remote_system("gzip -t "+sh_quote(path)); } } return -1; } void create(string h) { host=h; sscanf(host,"%s@%s",user,host); } } class LocalFS { inherit VFileSystem; string query_fs_type() { return "file"; } string mkurl(string path) { return "file://"+path; } Stdio.File open(string file, string mode, void|int pos) { Stdio.File f=Stdio.File(); if(!f->open(file,mode)) return 0; f->seek(pos); return f; } mixed low_file_stat(string file,int lnk) { if(file=="") file="."; if(file[-1]=='/') file+="."; return file_stat(file,lnk); } int low_file_size(string file,int lnk) { mixed s=low_file_stat(file,lnk); if(!s) return -1; return s[1]; } int file_size(string file,int|void lnk) { int size=low_file_size(file,lnk); if(lnk) return size; switch(size) { /* Link or special file */ case -3: return -3; case -4: string tmp=Process.popen("wc -c "+sh_quote(file)); if(sscanf(tmp,"%d ",size)) return size; default: return size; } } int cksum(string file) { return ((int)Process.popen("cksum "+sh_quote(file))) || 1; } int rm(string file) { if(file[-1]=='/') file=file[..sizeof(file)-2]; return predef::rm(file); } int mkdir(string file) { if(file[-1]=='/') file=file[..sizeof(file)-2]; return predef::mkdir(file); } int chmod(string file, int mode) { return !catch { predef::chmod(file,mode); }; } int file_mode(string file) { mixed s=low_file_stat(file,0); if(!s) return -1; return s & 07777; } string readlink(string s) { return predef::readlink(s); } int symlink(string s, string to) { return !catch { predef::symlink(s,to); }; } array(string) get_dir(string dir) { if(!strlen(dir)) dir="."; else if(dir[-1]=='/') dir+="."; return predef::get_dir(dir); } int verify(string path) { if(sscanf(reverse(path),"%s.",string ext)) { switch(reverse(ext)) { case "gz": case "tgz": case "Z": return !Process.system("gzip -t "+sh_quote(path)); } } return -1; } } class SmartRemoteFS { int buffersize=10; inherit RemoteFS; #define RETRY do{ debug(2,"RETRY at %d\n", __LINE__); if(ServerConn) destruct(ServerConn); retry(); }while(0) #define TRY while(1) { if(mixed _err=catch { #define UNTIL_DONE }) { if(!is_retry(_err)) throw(_err); sleep(1); continue; } break; } string query_fs_type() { return "ssh-proxy"; } string mkurl(string path) { // werror("%s\n",path); if(sscanf(path,"file:%s", string tmp) && buffersize<20) { sscanf(tmp,"//%s",tmp); return ::mkurl(tmp); }else{ string h=host; if(buffersize!=10) h+="["+buffersize+"]"; if(user) h=user+"@"+host; return "ssh-proxy://"+h+"/"+path; } } void create(string h) { ::create(h); sscanf(host,"%s[%d]",host,buffersize); } Remote ServerConn; Codec codec=Codec(); int need_to_read_version; Remote start() { if(ServerConn) return ServerConn; string cmd="( test \"x`type pcp 2>/dev/null`\" != x && pcp"; if(debug_level) cmd+=" -d"+debug_level; if(read_speed < 1000000000.0) cmd+=" -s"+read_speed; cmd+=sprintf(" --pcp-server-mode=%d",buffersize); debug(2,"\nStarting remote pcp server.\n"); cmd+=" ) || echo WRONG"; need_to_read_version=1; return ServerConn=Remote(cmd); } void read_version() { if(need_to_read_version) { if(!ServerConn) RETRY; if(ServerConn->read(4)!=PROTOCOL_VERSION) RETRY; need_to_read_version=0; } } int test() { TRY { debug(2,"Testing if remoe has pcp.\n"); start(); string s=ServerConn->read(4); if(strlen(s)!=4) RETRY; need_to_read_version=0; return s==PROTOCOL_VERSION; } UNTIL_DONE; } class FakeFunction { string fun_name; void create(string f) { fun_name=f; } mixed `()(mixed ... args) { TRY { debug(2,"\nCalling fake function %s%O\n",fun_name,args); start(); mixed data=encode_value( ({fun_name})+args,codec); if(ServerConn->write(sprintf("%4c%s",strlen(data),data))!=strlen(data)+4) RETRY; read_version(); string s=ServerConn->read(4); if(!s) RETRY; int len; if(!sscanf(s,"%4c",len)) RETRY; data=ServerConn->read(len); if(strlen(data) != len) RETRY; data=decode_value(data,codec); debug(2,"\nFake function %s returns: %O\n",fun_name,data); return data; } UNTIL_DONE; } } string combine_path(string dir, string file) { if(strlen(dir) && dir[-1]==':') return dir+file; return ::combine_path(dir,file); } string basename(string dir) { if(search(dir,"/")==-1 && sscanf(dir,"%*s:%s",dir)) return dir; return ::basename(dir); } function `->(string x) { switch(x) { case "file_size": case "cksum": case "get_dir": case "rm": case "chmod": case "readlink": case "mkdir": case "verify": case "recursive_sizeof": case "expand": case "map": case "basename": debug(2,"\nFaking function %s:%s\n",host,x); return FakeFunction(x); } return this_object()[x]; } class FakeFile { Remote SCon; string ibuf=""; int pos; int writing; int err=0; int errno() { return err; } int get_return_code() { string tmp=SCon->read(4); sscanf(tmp,"%4c",int len); tmp=SCon->read(len); } string read(int|int howmuch, int|int blocking) { string ret; if(writing) error("File not open for reading.\n"); if(!query_num_arg()) howmuch=0x7fffffff; while(SCon && (blocking?strlen(ibuf)read(4); sscanf(tmp,"%4c",len); debug(3,"Fakefile read() got len=%d\n",len); if(!len) { get_return_code(); if(!ServerConn) { ServerConn=SCon; SCon=0; }else{ destruct(SCon); SCon=0; } }else{ string tmp=SCon->read(len); if(!tmp || strlen(tmp) != len) { debug(3,"Fakefile failed read() got len=%d\n",strlen(tmp||"")); destruct(SCon); tmp=""; break; } ibuf+=tmp; } } if(howmuch>=strlen(ibuf)) { ret=ibuf; ibuf=""; }else{ ret=ibuf[..howmuch-1]; ibuf=ibuf[howmuch..]; } pos+=strlen(ret); return ret; } int write(string data) { if(!writing || !SCon) error("File not open for writing.\n"); if(!strlen(data)) return 0; int len=SCon->write(sprintf("%4c%s",strlen(data),data)); if(len!=strlen(data)+4) { debug(2,"FakeFile write failed %d != %d+4\n",len,strlen(data)); err=SCon->errno(); SCon->waitforproc=0; destruct(SCon); return -1; } pos+=strlen(data); return strlen(data); } void create(string mode) { writing=mode == "w"; read_version(); SCon=ServerConn; ServerConn=0; string tmp=SCon->read(4); if(strlen(tmp||"")!=4) { destruct(SCon); RETRY; } sscanf(tmp,"%4c",pos); debug(3,"Fakefile starting at %d, mode = %s\n",pos,mode); } int close(void|string how) { if(!SCon) return 0; if(writing) { int r=SCon->write(sprintf("%4c",0)); if(r!=4) error(sprintf("Failed to close (%d != 4).\n",r)); get_return_code(); if(!ServerConn) { ServerConn=SCon; SCon=0; }else{ destruct(SCon); } }else{ destruct(SCon); } return 1; } int tell() { return pos; } void destroy() { close(); } } Stdio.File open(string file, string mode, void|int pos) { TRY { debug(2,"\nOpening %s %s %d\n",file,mode,pos); if(search(mode,"r")!=-1) { mixed cmd=encode_value( ({"bincatout",file,mode,pos }), codec); if(start()->write(sprintf("%4c%s",strlen(cmd),cmd))!=strlen(cmd)+4) RETRY; return FakeFile("r"); } if(search(mode,"w")!=-1) { mixed cmd=encode_value( ({"bincatin",file,mode,pos }),codec); if(start()->write(sprintf("%4c%s",strlen(cmd),cmd))!=strlen(cmd)+4) RETRY; return FakeFile("w"); } return 0; } UNTIL_DONE; } int do_optimize(array(string) cmd) { cmd=Array.map(args+cmd,sh_quote); object r=Remote(cmd*" "); cat(r); return r->wait(); } }; mapping(string:mapping(string:int)) supports_cache=([]); class FTPFS { inherit CacheVFileSystem; string host; int port=21; string user="anonymous"; string pw="pcp@"; #define RETRY do{ debug(2,"RETRY at %d\n", __LINE__); destruct(ftpserv); retry(); }while(0) #define TRY while(1) { if(mixed _err=catch { #define UNTIL_DONE }) { if(!is_retry(_err)) throw(_err); sleep(10); continue; } break; } Stdio.FILE ftpserv; mapping(string:int) supports=([]); string query_fs_type() { return "ftp"; } string mkurl(string path) { string h=host; if(port!=21) h+=":"+port; if(user!="anonymous" || pw!="pcp@") { string u=user || "anonymous"; if(pw) u+=":"+pw; h=u+"@"+h; } return "ftp://"+h+"/"+path; } string body; int ftp_answer() { body=""; int seen_minus; while(1) { int i,c; string data; string s=ftpserv->gets(); if(!s) RETRY; debug(2,"ftp%s< %O\n",host,s); if(sscanf(s,"%d%c%s",i, c, data)!=3) { if(seen_minus) { data=s; c='-'; }else{ RETRY; } } body+=data+"\n"; if(c!='-') return i; else seen_minus++; } } int ftp_success() { while(1) { int ret=ftp_answer(); switch(ret/100) { case 1: continue; /* In progress - wait */ case 2: return ret; /* Ok, done */ case 3: return ret; /* Ok, send next command in sequence */ case 4: return 0; /* Transient failiure */ case 5: return 0; /* Failure */ default: RETRY; /* Buggy FTP server */ } } } int await_return_code; int send_cmd(string cmd) { if(await_return_code) { await_return_code=0; ftp_success(); } debug(2,"FTP%s> %s\n",host,cmd); cmd+="\r\n"; if(!ftpserv) RETRY; if(ftpserv->write(cmd)!=strlen(cmd)) RETRY; } int very_low_ftpcmd(string cmd) { send_cmd(cmd); return ftp_answer(); } int low_ftpcmd(string cmd) { int ret=very_low_ftpcmd(cmd); while(ret/100==1) ret=ftp_answer(); return ret; } int ftpcmd(string cmd) { send_cmd(cmd); return ftp_success(); } int do_connect() { if(ftpserv) return 1; ftpserv=Stdio.FILE(); debug(2,"FTP: Connecting to %s:%d\n",host,port); int x; if(catch { x=ftpserv->connect(host,port); }) RETRY; if(!x) { werror("Failed to connect to %s:%d\n",host,port); RETRY; } debug(2,"FTP: Connected\n"); if(!ftp_success()) { werror("No ftp answer at %s:%d\n",host,port); RETRY; } debug(2,"FTP: Sending USER\n"); int ret=ftpcmd("USER "+user); switch(ret/100) { default: werror("Login failed.\n"); RETRY; case 3: if(user!="anonymous" && pw=="pcp@a.b") { pw=readline(sprintf("password for %s@%s: ",user,host)); if(!pw) exit(0); } debug(2,"FTP: Sending PASS\n"); if(!ftpcmd("PASS "+pw)) { werror("Failed to login: %s.\n",body); RETRY; } case 2: } debug(2,"FTP: Switching to binary mode\n"); ftpcmd("TYPE I"); return 1; } string iface; Stdio.Port low_ftpport() { Stdio.Port p=Stdio.Port(); if(!iface) { do_connect(); iface=(ftpserv->query_address(1)/" ")[0]; } int e; for(e=0;e<10;e++) if(p->bind(0,0,iface)) break; if(e==10) { iface=0; sleep(10); RETRY; } sscanf(p->query_address(),"%*s %d",int local_port); sscanf(iface,"%d.%d.%d.%d",int a,int b,int c,int d); int ret=ftpcmd(sprintf("PORT %d,%d,%d,%d,%d,%d",a,b,c,d, local_port/256,local_port%256)); if(!ret) { predef::write("Port command failed. %d %s",ret,body); RETRY; } return p; } class pasv_ftpport { private int p; private Stdio.File o; Stdio.File accept() { return o; } void create(int p) { o=Stdio.File(); debug(2,"FTP: Connecting to %s:%d\n",host,p); if(o->connect(host,p)) { supports->PASV=1; debug(2,"FTP: Connection worked!\n"); }else{ debug(2,"FTP: Connection failed\n"); o=0; } } } Stdio.Port ftpport() { if(supports->PASV >= 0) { do_connect(); if(ftpcmd("PASV")) if(sscanf(body,"%*s(%*d,%*d,%*d,%*d,%d,%d)",int low, int high)==7) return pasv_ftpport(low*256 + high); supports->PASV=-1; } return low_ftpport(); } string get_ftp_data(string cmd) { string tmp=""; Stdio.Port p=ftpport(); if(very_low_ftpcmd(cmd)/100!=1) { predef::write("Command failed.\n"); RETRY; } Stdio.File o=p->accept(); if(!o) RETRY; while(string s=o->read(1000,1)) { if(!strlen(s)) break; // add_sent(s); tmp+=s; } destruct(o); if(!ftp_success()) { predef::write("Command failed.\n"); RETRY; } return tmp; } array(string) get_dir(string dir) { string tmp; TRY { do_connect(); string tmp=get_ftp_data("LIST -l "+((dir&&strlen(dir))?dir:".")); // add_sent(tmp); return parse_directory(dir,tmp); } UNTIL_DONE; } int file_size(string file,void|int lnk) { TRY { if(!zero_type(flcache[file])) return flcache[file]; if(supports->SIZE >= 0 && !lnk) { do_connect(); int ret=low_ftpcmd("SIZE "+file); switch(ret) { case 213: supports->SIZE=1; if((int)body) { return flcache[file]=(int)body; } break; /* We cannot trust SIZE */ default: supports->SIZE=-1; case 550: /* Directory or nonexistant */ } } string dir=dirname(file); if(!dir) return flcache[file]=-2; /* Directory */ get_dir(dir); if(!zero_type(flcache[file])) return flcache[file]; return -1; } UNTIL_DONE; } class Remote { string file; int pos; inherit Stdio.File : data; string read(void|int x, void|int y) { string ret; switch(query_num_arg()) { case 2: ret=data::read(x,y); break; case 1: ret=data::read(x); break; case 0: ret=data::read(); } pos+=strlen(ret||""); return ret; } int write(string s) { int ret=data::write(s); m_delete(flcache,file); return ret; } int tell() { return pos; } } Stdio.File open(string file, string mode, void|int pos) { Stdio.Port p; TRY { do_connect(); switch(mode) { case "r": p=ftpport(); if(pos) if(!ftpcmd("REST "+pos)) pos=0; int ret=very_low_ftpcmd("RETR "+file); if(ret/100!=1) { werror("RETR failed %d:%s\n",ret,body); RETRY; } await_return_code=1; break; case "wct": case "wtc": case "twc": case "tcw": case "cwt": case "ctw": if(pos) return 0; if(!rm(file)) return 0; case "cwa": case "caw": case "wca": case "acw": case "wac": case "awc": pos=file_size(file); p=ftpport(); ret=very_low_ftpcmd("APPE "+file); if(ret/100!=1) { werror("APPE failed: %d %s\n",ret,body); RETRY; } await_return_code=1; break; default: return 0; } if(!p) return 0; Stdio.File data=Remote(); object x=p->accept(); if(!x) RETRY; data->assign(x); data->file=file; data->pos=pos; return data; } UNTIL_DONE; } int rm(string file) { TRY { do_connect(); return ftpcmd("DELE "+file); } UNTIL_DONE; } int mkdir(string dir) { TRY { do_connect(); return ftpcmd("MKD "+dir); } UNTIL_DONE; } int cksum(string file) { return 0; } void create(string hostdata) { host=hostdata; if(!(supports=supports_cache[hostdata])) supports=supports_cache[hostdata]=([]); if(sscanf(reverse(host),"%s@%s",host,user)) { user=reverse(user); host=reverse(host); sscanf(user,"%s:%s",user,pw); } sscanf(host,"%s:%d",host,port); TRY { do_connect(); } UNTIL_DONE ; } int verify(string path) { return -1; } int do_optimize(string from, FTPFS tofs, string to) { if(supports->PASV<0) return 0; TRY { int ret,a,b,c,d; int size=tofs->file_size(to); if(size==-1) size=0; do_connect(); if(!ftpcmd("PASV")) { supports->PASV=-1; return 0; } sscanf(body,"%*s(%*d,%*d,%*d,%*d,%d,%d)",int low, int high); if(catch { string ip=ftpserv->query_address(); debug(2,"IP: %s -> %s\n",host,ip); sscanf(ip,"%d.%d.%d.%d",a,b,c,d); }) return 0; if(!tofs->ftpcmd(sprintf("PORT %d,%d,%d,%d,%d,%d",a,b,c,d,low,high))) { werror("PORT failed.\n"); return 0; } if(size) if(!ftpcmd("REST "+size)) size=0; ret=tofs->very_low_ftpcmd((size?"APPE":"STOR")+" "+to); if(ret/100!=1) { werror("APPE/STOR failed.\n"); RETRY; } ret=very_low_ftpcmd("RETR "+from); if(ret/100!=1) { werror("RETR failed.\n"); RETRY; } // NO, this line is NOT missing a '&' /hubbe if(!(ftp_success() & tofs->ftp_success())) RETRY; return 1; } UNTIL_DONE; } }; #if 1 class HTTPFS { inherit VFileSystem; string host; int port=80; string user; string pw; #define RETRY do{ debug(2,"HTTP RETRY at %d\n", __LINE__); retry(); }while(0) #define TRY while(1) { if(mixed _err=catch { #define UNTIL_DONE }) { if(!is_retry(_err)) throw(_err); sleep(1); continue; } break; } string query_fs_type() { return "http"; } string mkurl(string path) { string h=host; if(port!=80) h+=":"+port; if(user || pw) { string u=user || "anonymous"; if(pw) u+=":"+pw; h=u+"@"+h; } return "http://"+h+"/"+path; } string dirname(string path) { if(path=="" || path=="." || path=="/") return 0; array(string) tmp=path/"/"; return tmp[..sizeof(tmp)-2]*"/"; } object ssh_proxy; class Request { inherit Stdio.FILE : data; int pos,code; mapping headers=([]); void create(string path, string mode, int wanted_pos) { string h=host; int p=port; if(sizeof(ssh_proxies)) { if(!ssh_proxy || ssh_proxy->status()) { Stdio.File f=Stdio.File(); array(string) cmd,ssh_cmd=({"echo OK;sleep 900"}); Stdio.File x=f->pipe(Stdio.PROP_BIDIRECTIONAL); foreach(reverse(ssh_proxies),string host) { int lport=6000+random(10000); cmd=({"ssh",sprintf("-L%d:%s:%d",lport,h,p)}); // if(h!="localhost") cmd+=({"-a"}); p=lport; h="localhost"; if(!interactive) cmd+=({"-x"}); if(debug_level>2) cmd+=({"-v"}); cmd+=({host}); ssh_cmd=cmd+ssh_cmd; } ssh_cmd=ssh_cmd[1..]; cmd=({"ssh"}); switch(compression) { case 0: cmd+=({"-o","Compression no"}); break; default: cmd=({"-C","-o","CompressionLevel "+compression}); case -1: } if(user) cmd+=({"-l",user}); cmd+=ssh_cmd; debug(2,"HTTP: using ssh proxy cmd: %O\n",cmd); ssh_proxy=create_process(cmd,(["stdin":x,"stdout":x])); destruct(x); if(f->read(2)!="OK") RETRY; } } int x; if(catch { x=connect(h,p); } || !x) RETRY; debug(2,"Connected to %s:%d\n",host,port); string req=sprintf("GET /%s HTTP/1.0\r\n" "Host: %s\r\n" "User-agent: pcp/%s\r\n" "Accept: */*\r\n", path, host, replace(ID," ","_")); if(wanted_pos) req+=sprintf("Range: bytes=%d-\r\n",wanted_pos); if(user || pw) { req+=sprintf("Authorization: Basic %s\r\n", MIME.encode_base64((user||"anonymous") +":"+ pw)); } req+="\r\n"; write(req); debug(2,"SENT: %s",req); string s=gets(); if(!s) RETRY; s-="\r"; sscanf(lower_case(s),"http%s %d %s",string version, code, string rest); debug(3,"Return line: %s\n",s); debug(2,"Return Code: %d\n",code); switch(code) { case 206: pos=wanted_pos; code=200; break; case 200: pos=0; } while(1) { string l=gets(); if(!l) break; l-="\r"; if(l=="") break; if(sscanf(l,"%s: %s",string x, string y)==2) { debug(3,"Got header %s = %s\n",x,y); while(y[-1]==' ') y=y[..strlen(y)-2]+(gets()-"\r"); x=lower_case(x); if(headers[x]) headers[x]+="\n"+y; else headers[x]=y; }else{ debug(3,"Got some line: %s\n",l); } } debug(3,"Headers = %O\n",headers); } string read(void|int x, void|int y) { string ret; switch(query_num_arg()) { case 2: ret=data::read(x,y); break; case 1: ret=data::read(x); break; case 0: ret=data::read(); } pos+=strlen(ret); return ret; } int write(string s) { int ret=data::write(s); // m_delete(flcache,file); return ret; } int tell() { return pos; } } Stdio.File ready_to_read; string ready_to_read_file; Stdio.File open(string file, string mode, void|int pos) { TRY { switch(mode) { case "r": if(ready_to_read_file==file && pos==ready_to_read->tell()) { object tmp=ready_to_read; ready_to_read_file=0; ready_to_read=0; return tmp; } object data=Request(file,"",pos); if(data->code!=200) return 0; return data; } } UNTIL_DONE; } mapping get_headers(string path) { if(!ready_to_read_file || path!=ready_to_read_file) { ready_to_read=0; ready_to_read=Request(path,"",0); if(!ready_to_read || ready_to_read->code!=200 || !ready_to_read->headers) { ready_to_read=0; ready_to_read_file=0; return 0; } ready_to_read_file=path; } return ready_to_read->headers; } int file_size(string path) { TRY { mapping h=get_headers(path); if(!h) return -1; string l=h["content-length"]; if(!l) return -4; debug(3,"file_size(%s) = %s\n",path,l); return (int)l; } UNTIL_DONE; } int cksum(string file) { return 0; } int mkdir(string file) { return -1; } string *get_dir(string file) { return 0; } int rm(string file) { return -1; } void create(string hostdata) { [ user, pw, host, port, int bufsize ] = decode_host(hostdata,0,0,80,0); } string *expand(string *s) { return s; } string basename(string path) { TRY { mapping h=get_headers(path); if(h && h["content-disposition"]) { if(sscanf(h["content-disposition"],"%*sfilename=\"%s\"",string filename)) { return ::basename(filename); } } return ::basename(path); } UNTIL_DONE; } } #endif class ProxyFS { inherit VFileSystem; string query_fs_type() { return "proxy"; } string mkurl(string path) { return "proxy:"+path; } class FakeFunction { string fun_name; void create(string f) { fun_name=f; } mixed `()(mixed path, mixed ... args) { VFileSystem fs; [fs,path]=fsopen(path); return fs[fun_name](path,@args); } } #define FAKE(Y,X) \ Y X(string path, mixed ... args) { \ VFileSystem fs; \ [fs,path]=fsopen(path); \ return fs->X(path,@args); \ } FAKE(int,file_size) FAKE(int,rm) FAKE(int,cksum) FAKE(int,mkdir) FAKE(Summary,recursive_sizeof) #undef FAKE string *expand(string *paths) { string *ret=({}); foreach(paths, string path) { VFileSystem fs; [fs,path]=fsopen(path); foreach(fs->expand( ({path}) ), string expanded) ret+=({ fs->mkurl(expanded) }); } return ret; } Stdio.File open(string file, string mode, void|int pos) { VFileSystem fs; [fs,file]=fsopen(file); return fs->open(file,mode,pos); } array(string) get_dir(string dir) { VFileSystem fs; [fs,dir]=fsopen(dir); return fs->get_dir(dir); } string combine_path(string dir, string file) { VFileSystem fs; [fs,dir]=fsopen(dir); return fs->mkurl(fs->combine_path(dir,file)); } string basename(string path) { VFileSystem fs; [fs,path]=fsopen(path); return fs->basename(path); } string dirname(string path) { VFileSystem fs; [fs,path]=fsopen(path); return fs->mkurl(fs->dirname(path)); } #if constant(thread_create) int bincatout(string file, string mode, int pos) { Thread.Fifo fifo=Thread.Fifo(fifo_size); Stdio.File f=open(file,mode,pos); debug(3,"bincatout open(%s,%s,%d), pos=%d\n",file,mode,pos,f->tell()); write("%4c",f->tell()); thread_create(lambda(Stdio.File f, Thread.Fifo fifo, string file, string mode, int pos) { while(1) { string s=f->read(BLOCK_SIZE,1); if(!s) { while(!(f=open(file,mode,pos))) sleep(10+fifo->size()/10); while(f->tell() < pos) f->read(min(pos - f->tell(),65536)); continue; } if(!strlen(s)) break; fifo->write(s); pos+=strlen(s); debug(4,"FIFO %d/%d\n",fifo->size(),fifo_size); } fifo->write(0); },f,fifo,file,mode,pos); while(string s=fifo->read()) { debug(4,"BINCATOUT, writing %d bytes\n",strlen(s)); int written=write("%4c%s",strlen(s),s); add_sent(s); if(written!=strlen(s)+4) exit(1); } write("%4c",0); return 0; } int bincatin(string file, string mode, int pos) { object in=Stdio.File("stdin"); Stdio.File f=open(file,mode,pos); debug(3,"bincatin open(%s,%s,%d), pos=%d\n",file,mode,pos,f->tell()); write("%4c",f->tell()); while(1) { string tmp=in->read(4); if(strlen(tmp||"")!=4) { debug(2,"bincatin: Read block length failed.\n"); exit(1); } sscanf(tmp,"%4c",int len); debug(4,"BINCATIN, reading %d bytes\n",len); if(!len) { debug(2,"bincatin: Got EOF.\n"); break; } string s=in->read(len); if(strlen(s||"")!=len) { debug(2,"bincatin: Failed to read input.\n"); exit(1); } int written=f->write(s); add_sent(s); if(written!=len) { debug(2,"bincatin: Failed to wrie output.\n"); exit(1); } } return 0; } #else int bincatout(string file, string mode, int pos) { Stdio.File f=open(file,mode,pos); debug(3,"bincatout open(%s,%s,%d), pos=%d\n",file,mode,pos,f->tell()); write("%4c",f->tell()); while(1) { string s=f->read(BLOCK_SIZE,1); if(!s || !strlen(s)) break; debug(4,"BINCATOUT, writing %d bytes\n",strlen(s)); int written=write("%4c%s",strlen(s),s); add_sent(s); if(written!=strlen(s)+4) exit(1); } write("%4c",0); return 0; } int bincatin(string file, string mode, int pos) { object in=Stdio.File("stdin"); Stdio.File f=open(file,mode,pos); debug(3,"bincatin open(%s,%s,%d), pos=%d\n",file,mode,pos,f->tell()); write("%4c",f->tell()); while(1) { string tmp=in->read(4); if(strlen(tmp||"")!=4) exit(1); sscanf(tmp,"%4c",int len); debug(4,"BINCATIN, reading %d bytes\n",len); if(!len) break; string s=in->read(len,1); if(strlen(s||"")!=len) exit(1); int written=f->write(s); add_sent(s); if(written!=len) exit(1); } return 0; } #endif int fifo_size=10; void create(int blocks) { if(blocks) fifo_size=blocks; } }; mapping has_pcp_cache=([]); mapping(string:VFileSystem) fscache=([]); VFileSystem get_smartremotefs_if_possible(string host, void|array(string) proxies) { if(has_pcp_cache[host]==-1) return 0; string cent="ssh-proxy:"+host; object o=fscache["ssh-proxy:"+host]=fscache[cent] || SmartRemoteFS(host); o->local_ssh_proxies=proxies || ssh_proxies; if(has_pcp_cache[host]==1) return o; if(o->test()) { has_pcp_cache[host]=1; return o; }else{ has_pcp_cache[host]=-1; destruct(o); } } mixed fsopen(string path) { string proto, host; if(!sscanf(path,"%s:%s",proto, path)) proto="file"; switch(proto) { case "file": sscanf(path,"//%s",path); return ({LocalFS(),path}); case "proxy": return ({ProxyFS(0),path}); default: path="//"+proto+"/"+path; proto="ssh"; case "ssh-proxy": case "ssh-raw": case "ssh": case "ftp": case "http": if(sizeof(ssh_proxies)) { if(object o=get_smartremotefs_if_possible(ssh_proxies[-1], ssh_proxies[..sizeof(ssh_proxies)-2])) { return ({o,proto+":"+path}); } } if(!sscanf(path,"//%s",path)) { debug(2,"URL: %s\n",path); werror("Url format: "+proto+"://host/path\n"); exit(1); } if(!sscanf(path,"%s/%s",host,path)) { host=path; path=""; } string cent=proto+":"+host; if(fscache[cent]) return ({ fscache[cent], path }); switch(proto) { case "ftp": return ({fscache[cent]=FTPFS(host), path}); case "ssh": if(object tmp=get_smartremotefs_if_possible(host)) { return ({ fscache[cent]=tmp,"file:"+path }); }else{ return ({fscache[cent]=RemoteFS(host), path}); } case "ssh-raw": return ({fscache[cent]=RemoteFS(host), path}); case "ssh-proxy": return ({ fscache[cent]=SmartRemoteFS(host),path }); case "http": return ({ fscache[cent]=HTTPFS(host),path }); } } werror("\nfsopen failed!\n"); exit(1); } class Status { float bps=-1.0; float done=-1.0; float howmuch=-1.0; void create(float|int b,float|int d,float|int h) { bps=(float)b; done=(float)d; howmuch=(float)h; } string left() { return ""+(int)((howmuch-done)/1024.0); } string done_of_howmuch() { return sprintf("%d/%d",(int)(done/1024.0),(int)(howmuch/1024.0)); } string kbps() { return sprintf("%2.2f",bps/1024.0); } string eta() { if(bps<=0.0) return ""; int seconds=(int)((howmuch-done)/bps); switch(seconds) { default: return ""; case 0..: return sprintf("%dm%02ds",seconds/60,seconds%60); break; case 3600..: return sprintf("%dh%02dm",seconds/3600,(seconds/60)%60); break; case 86400..:return sprintf("%dd%02dh",seconds/86400,(seconds/3600)%24); } } }; string base; void update_file(VFileSystem from_fs, string from, VFileSystem to_fs, string to, string b) { base=b; } void update_status(Status file, Status all, string message) { string output=" "+base; if(file) output+=sprintf(" %s kb %s kb/s",file->done_of_howmuch(),file->kbps()); if(all) { // werror("%O %O\n",all->howmuch, all->done); output+=sprintf(" left: %s %s",all->left(),all->eta()); } if(message) output+=" "+message; status_line(output); } class low_copy { VFileSystem from_fs; string from; VFileSystem to_fs; string to; int size; string base; int old_start=-10; int retries=0; int start=-1; Bps file_bps=Bps(); int restart_from_scratch=0; Status eta() { float my_bps; float left; float total; if(total_size!=-1.0) { total=(float)total_bytes; left=total_size - total_bytes - start; float tid=time(base_time)-first_bps_time; if(tid<=0.0) return 0; my_bps=bytes/tid; }else{ left=(float)(size-start); my_bps=bps; total=(float)size; } if(my_bps<=0.0) { debug(2,"eta(): ZERO BPS\n"); return 0; } return Status(my_bps,total-left ,total); } void refresh(void|string msg) { update_status(Status(bps, start, size), eta(), msg); } #if constant(alarm) #define ALARM_DELAY 5 int alarm_counter; void alarm_func() { debug(1,"\nALARM\n"); int timeout=alarm_counter++ > 5 * 60 / ALARM_DELAY; add_sent(0); update_status(Status(bps, start, size), eta(), timeout&&"timeout"); // status_line(" %s %d/%d kb %1.2f kb/s%s%s",base,start/1024,size/1024,bps/1024.0,eta(),timeout?" timeout":""); alarm(ALARM_DELAY); if(timeout) throw(({"timeout",backtrace()})); } #define START_ALARM() \ debug(3,"ALARM activated\n"); \ alarm_counter=0; \ alarm(ALARM_DELAY); \ signal(signum("SIGALRM"),alarm_func); \ { mixed _foo = catch { \ #define END_ALARM() \ }; \ alarm_counter=0; \ alarm(0); \ signal(signum("SIGALRM"),lambda(){}); \ debug(3,"ALARM deactivated\n"); \ if(arrayp(_foo) && sizeof(_foo) && _foo[0]=="timeout") break; if(_foo) throw(_foo); } #else #define START_ALARM() #define END_ALARM() #endif void create(VFileSystem _from_fs, string _from, VFileSystem _to_fs, string _to, int name_combined) { from_fs=_from_fs; from=_from; to_fs=_to_fs; to=_to; foreach(except, Regexp r) if(r->match(from)) return; // FIXME: move the tests to the filesystems themselves if(optimize && !(to_fs->query_fs_type()=="file" || from_fs->query_fs_type()=="file")) { if(to_fs->query_fs_type()=="ssh-proxy") if(!to_fs->do_optimize(({from_fs->mkurl(from),to}))) return; if(from_fs->query_fs_type()=="ssh-proxy") if(!to_fs->do_optimize(({from,to_fs->mkurl(to)}))) return; } base=from_fs->basename(from); debug(2,"From = %s, To = %s, base = %s\n",from,to,base); update_file(from_fs,from,to_fs,to,base); update_status(0,0,0); // status_line(" %s ",base); switch(size=from_fs->file_size(from)) { case -1: werror("No such file "+from+".\n"); return; case -3: { /* Link */ switch(to_fs->file_size(to,1)) { case -2: if(!name_combined) { to=to_fs->combine_path(to,base); name_combined=0; }else{ werror("Target already exists as a directory.\n"); exit(1); } case -3: case 0..: if(!to_fs->rm(to)) { werror("Failed to remove file/previos symlink.\n"); exit(1); } } if(string as=from_fs->readlink(from)) { if(to_fs->symlink(as,to)) { update_status(0,0," -> "+as+"\n"); return; }else{ output("\nFailed to create symlink (%s -> %s)\n",to,as); exit(1); } }else{ output("\nFailed to read symlink.\n"); exit(1); } } case -2: /* Directory */ if(!recurse) return; update_status(0,0,":"); // status_line(" %s : ",base); debug(2,"\nDirectory found, descending.\n"); if(!name_combined) { debug(2,"Combinining target %s and %s\n",to,base); to=to_fs->combine_path(to,base); name_combined=1; update_file(from_fs,from,to_fs,to,base); } switch(to_fs->file_size(to)) { default: werror("Directory %s already exists as a file.\n",to); exit(1); case -1: debug(2,"\nCreating target directory %s.\n",to); while(1) { to_fs->mkdir(to); switch(to_fs->file_size(to)) { default: werror("Directory %s already exists as a file.\n",to); exit(1); case -2: debug(2,"\nMkdir succeded.\n"); break; case -1: werror("Mkdir %s failed.\n",to); continue; } break; } break; case -2: debug(2,"\nTarget directory already exists.\n"); break; } string *tmp=from_fs->get_dir(from); if(!tmp) { update_status(0,0,": permission denied\n"); return; } sort(tmp); update_status(0,0,sprintf(": %d files/directories\n",sizeof(tmp))); // status_line(" %s : %d files/directories\n",base,sizeof(tmp)); foreach(tmp, string file) { debug(2,"Recusively copying %s\n",file); low_copy(from_fs,from_fs->combine_path(from,file), to_fs,to_fs->combine_path(to,file), 1); } return; default: } while(1) { start=to_fs->file_size(to); switch(start) { case -2: if(!name_combined) { to=to_fs->combine_path(to,base); name_combined=0; continue; } werror("Target already exists as a directory.\n"); exit(1); case -1: /* FIXME, a nonexistant file is not the same as a zero-length file! */ /* Consider setting restart_from_scratch here */ break; case -4: case -3: werror("Target already exists as special (device) file.\n"); exit(1); } if(start < old_start) { status("\nFILE SHRUNK from %d to %d bytes\n",old_start,start); } debug(1,"\nSTART = %d, SIZE=%d\n",start,size); if(start>=size) { int wrong; if(start > size && size>=0) { wrong++; }else{ if(verify) { int fck, tck; if(fck=from_fs->cksum(from)) tck=to_fs->cksum(to); if(fck && tck) { if(tck != fck) wrong++; }else{ switch(to_fs->verify(to)) { case 0: wrong++; break; case 1: debug(1,"Single-sided verify success!\n"); break; case -1: if(from_fs->file_size(from) != to_fs->file_size(to)) wrong++; } } // werror("Verify: %d?=%d %d?=%d\n",ck1,ck2,len1,len2); } } if(wrong) { update_status(0,0,"Verify failed\n"); // status_line(" Verify failed!\n"); // exit(0); // to_fs->rm(to); restart_from_scratch=1; start=0; }else{ break; } } // Move the tests to the filesystems themselves if(optimize && from_fs->query_fs_type()=="ftp" && to_fs->query_fs_type()=="ftp") { if(from_fs->do_optimize(from,to_fs,to)) continue; } int tmptmp=0; Stdio.File f; do { START_ALARM(); f=from_fs->open(from,"r",max(start,0)); END_ALARM(); tmptmp=1; }while(0); if(!tmptmp) { update_status(0,0,sprintf("open timeout")); continue; } if(!f) { werror("Open failed (file does not exist?)\n"); break; } while(f->tell() < start) { update_status(0,0,sprintf(">> %d bytes",start-f->tell())); debug(1,"Forwarding manually %d bytes\n",start-f->tell()); START_ALARM(); f->read(min(start - f->tell(),BLOCK_SIZE),1); END_ALARM(); } if(f->tell() < start) { update_status(0,0,sprintf(">> timeout",start-f->tell())); continue; } if(start == old_start) { switch(retries++) { case 10..: sleep(300); break; case 2..: sleep(retries*20); break; case 0..: break; } }else{ retries=0; } old_start=start; update_status(Status(bps, start, size),eta(),0); // status_line(" %s %d/%d kb %1.2f kb/s%s",base,start/1024,size/1024,bps/1024.0,eta()); Stdio.File data; if(restart_from_scratch) data=to_fs->open(to,"wct",0); else data=to_fs->open(to,"caw"); /* File now exists (and if just created, at position zero) */ if(start<0) start=0; while(1) { string s; int written; START_ALARM(); s=f->read(BLOCK_SIZE,1); END_ALARM(); if(!s || !strlen(s)) { debug(4,"low_copy: read failed %O\n",s); break; } debug(4,"low_copy: Got %d bytes\n",strlen(s)); add_sent(s,refresh); start+=strlen(s); START_ALARM(); written=data->write(s); END_ALARM(); if(written!=strlen(s)) { debug(1,"\nWrite failed (%d != %d, %s), reconnecting.\n",written,strlen(s),(string)strerror(data->errno())); break; } update_status(Status(bps, start, size),eta(),0); // status_line(" %s %d/%d kb %1.2f kb/s%s",base,start/1024,size/1024,bps/1024.0,eta()); } destruct(data); destruct(f); } update_status(Status(file_bps->bps(), start, size),eta(),"Done\n"); // status_line(" %s %d/%d kb (%1.2f kb/s) Done\n",base,start/1024,size/1024,file_bps->bps()/1024.0); total_bytes+=size; } } ToDo *preprocess(string from, int no_summaries) { array(ToDo) ret=({}); VFileSystem from_fs; status_line("Expanding %s",from); [from_fs, from] = fsopen(from); string *files; files=({from}); if(search(from,"?")!=-1 || search(from,"*")!=-1) files=from_fs->expand(files); files=Array.filter(files,lambda(string s) { foreach(except, Regexp r) if(r->match(s)) return 0; return 1; }); array(Summary) summaries; if(no_summaries) summaries=allocate(sizeof(files)); else summaries=from_fs->map(files,"recursive_sizeof"); for(int e=0;eread(BLOCK_SIZE,1)) { if(!strlen(s)) break; int written=write(s); if(written!=strlen(s)) break; } } }; array parse_options(array(string) argv, void|array xopts) { /* Debugging options */ array ret=({}); #if constant(signal) signal(signum("SIGTERM"),lambda() { throw(({"timeout",backtrace()})); }); signal(signum("SIGQUIT"),lambda() { master()->handle_error( ({"\nSIGTERM recived, printing backtrace and continuing.\n",backtrace() }) ); }); #endif if(!xopts) xopts=({}); foreach(Getopt.find_all_options(argv,aggregate( ({"speed",Getopt.HAS_ARG,({"-s","--speed"}),"PCP_SPEED"}), ({"except",Getopt.HAS_ARG,({"-v","--except"}),"PCP_AVOID"}), ({"block size",Getopt.HAS_ARG,({"-b","--block-size"}),"PCP_BLOCK_SIZE"}), ({"recur",Getopt.NO_ARG,({"-r","--no-recurse"}),"PCP_NO_RECURSE"}), ({"debug",Getopt.MAY_HAVE_ARG,({"-d","--debug"}),"PCP_DEBUG"}), ({"verify",Getopt.NO_ARG,({"-V","--verify"}),"PCP_VERIFY"}), ({"optimize",Getopt.NO_ARG,({"--no-optimize","-O"}),"PCP_NO_OPTIMIZE"}), ({"glob",Getopt.NO_ARG,({"--no-glob","-g"}),"PCP_NO_GLOB"}), ({"compression",Getopt.HAS_ARG,({"--compression","-C"}),"PCP_COMPRESSION"}), ({"interactive",Getopt.NO_ARG,({"--interactive","-i"}),"PCP_INTERACTIVE"}), ({"ssh-proxy",Getopt.HAS_ARG,({"--ssh-proxy"}),"PCP_PROXY"}), ({"user",Getopt.HAS_ARG,({"-l","--user"}),"PCP_USER"}), ({"passwd",Getopt.HAS_ARG,({"-p","--passwd"}),"PCP_PASSWD"}), )+xopts),array opt) { switch(opt[0]) { default: ret+=({opt}); break; case "user": default_user=opt[1];; args+=({"-l"+opt[1]}); break; case "passwd": default_passwd=opt[1];; args+=({"-p"+opt[1]}); break; case "compression": args+=({"-C"+opt[1]}); compression=(int)opt[1]; break; case "speed": args+=({"-s"+opt[1]}); read_speed=1024.0 * ( (float)opt[1] + 0.00025 ); break; case "except": args+=({"-v"+opt[1]}); except+=({Regexp(opt[1])}); break; case "recur": args+=({"-r"}); recurse=!recurse; break; case "debug": if(!opt[1] || opt[1]=="") debug_level++; else debug_level=(int)opt[1]; args+=({"-d"+debug_level}); break; case "verify": args+=({"-V"}); verify++; break; case "optimize": args+=({"-O"}); optimize=0; break; case "glob": args+=({"-g"}); no_expand=1; break; case "block size": BLOCK_SIZE=(int)opt[1]; if(BLOCK_SIZE<1) { werror("Block size must be larger than zero.\n"); exit(1); } args+=({"-b"+BLOCK_SIZE}); break; case "ssh-proxy": ssh_proxies+=opt[1]/":"; break; case "interactive": interactive++; args+=({"-i"}); break; } } return ret; } array(string) filter_args(array(string) argv) { args+=({"--"}); return Getopt.get_args(argv); } constant globalopts=#"\ -s --speed= : Limit transfer speed -v --except= : Don't fetch files/dirs matching this -b --block-size= : Read/write this many bytes at a time -C --compression=: Gz compression -r --no-recurse : don't recurse into subdirs -d --debug= : show debug messages -V --verify : verify transfers if possible -O --no-optimize : don't optimize host-to-host transfers -g --no-glob : don't do internal globbing -i --interactive : tries to use interactive tcp priority --ssh-proxy= : go through this host "; class LineFile { private string b=""; private int bpos=0; Stdio.File f; void create(string path, object copier) { if(path=="-") { f=Stdio.stdin; }else{ VFileSystem fs; [fs, path]=copier->fsopen(path); f=fs->open(path,"r",0); } } inline private static nomask int get_data() { string s; b = b[bpos .. ]; bpos=0; s = f->read(BLOCK_SIZE,1); if(!s || !strlen(s)) return 0; b += s; return 1; } inline private static nomask string extract(int bytes, int|void skip) { string s; s=b[bpos..bpos+bytes-1]; bpos += bytes+skip; return s; } string gets() { int p,tmp=0; while((p=search(b, "\n", bpos+tmp)) == -1) { tmp=strlen(b)-bpos; if(!get_data()) return 0; } return extract(p-bpos, 1); } } int main(int argc, string *argv) { string path,from_file; int list,no_summary; CopyThread copier=CopyThread(); foreach(parse_options(argv,aggregate( ({"list",Getopt.NO_ARG,({"--list"})}), ({"version",Getopt.NO_ARG,({"--version"})}), ({"quiet",Getopt.NO_ARG,({"-q","--quiet"}),"PCP_QUIET"}), ({"proxy",Getopt.MAY_HAVE_ARG,({"--pcp-server-mode"}),"PCP_PROXY"}), ({"from-file",Getopt.HAS_ARG,({"-T","--from-file"})}), ({"no-summary",Getopt.NO_ARG,({"--no-summary"})}), )),mixed opt) { switch(opt[0]) { case "no-summary": no_summary++; break; case "from-file": no_summary++; from_file=opt[1]; break; case "quiet": args+=({"-q"}); quiet=!quiet; break; case "list": args+=({"--list"}); list=1; break; case "version": write("%s\n", ID); exit(0); case "proxy": { int kb_cache=(int)opt[1]; object codec=Codec(); quiet=1; debug(1,"Server started\n"); write(PROTOCOL_VERSION); object fs=copier->ProxyFS(kb_cache); while(string s=Stdio.stdin->read(4)) { if(strlen(s)!=4) exit(0); int len; if(!sscanf(s,"%4c",len)) exit(1); mixed data=Stdio.stdin->read(len); data=decode_value(data,codec); debug(2,"Serving %s\n",data[0]); data=encode_value(`->(fs,data[0])(@data[1..]),codec); write("%4c%s",strlen(data),data); } exit(1); } } } argv=filter_args(argv); if(sizeof(argv)<3- ((list||from_file)&&1) ) { werror( #"PCP version: "+ID+#" Copyright Fredrik Hübinette. This program may be freely copied under the terms of the GNU General Public Licence (GPL). Usage: pcp [options] from [from from from] to From and to are can be any of: ftp://[user[:pass]@]host/path : ftp transfer http://host[:port]/path : http transfer ssh://[user@]host/path or host:path : ssh transfer file:/path or path : local files ssh-proxy://[user@]host/URL : re-routed transfers Options: -q --quiet : don't show progress --list : list files --version : print the pcp version -T --from-file : read files to copy from 'file' --no-summary : don't compute total download time "+globalopts); exit(0); } // trace(1); debug(1,"DEBUG=%d VERSION=%s\n",debug_level,ID); VFileSystem from_fs,to_fs; status_line("Expanding"); array data=`+( ({}),@Array.map(argv[1..sizeof(argv)-2+list],copier->preprocess,no_summary)); erase_status_line(); if(!no_summary) { total_size=`+(0.0, @ (array(float)) data->summary->bytes); status("To copy: %d Kb, %d files, %d dirs.\n", (int)(total_size/1024), `+(0,@data->summary->files), `+(0,@data->summary->dirs)); } if(list) { foreach(data, ToDo x) { foreach(except, Regexp r) { if(r->match(x->path)) { x=0; break; } } if(!x) continue; write("%8d %s\n",x->summary->bytes,x->from_fs->mkurl(x->path)); } exit(0); } string to=argv[-1]; [to_fs, to]=copier->fsopen(to); int isdir=to_fs->file_size(to)==-2; debug(2,"Destination isdir: %d\n",isdir); if(sizeof(argv)!=3 && !isdir) { werror("Destination is not a directory.\n"); exit(1); } #if 0 if(optimize && to_fs->query_fs_type()=="ssh-proxy") { argv[-1]=to; if(!to_fs->do_optimize(to_fs, args+argv[1..])) return 0; } #endif foreach(data, ToDo x) { int save_total_bytes=total_bytes; // low_copy(x->from_fs,x->path,to_fs, // isdir?to_fs->combine_path(to,basename(x->path)):to); copier->low_copy(x->from_fs,x->path,to_fs,to,0); if(x->summary) total_bytes=save_total_bytes+x->summary->bytes; } if(from_file) { LineFile f=LineFile(from_file, copier); total_size=-1.0; while(string url=f->gets()) { if(!strlen(url)) continue; if(url[0]=='#') continue; foreach(copier->preprocess(url), ToDo x) { copier->low_copy(x->from_fs,x->path,to_fs,to,0); } } } status("Done.\n"); return 0; }