#!/usr/local/bin/pike
// Copyright Fredrik Hübinette, 1998
// Distribution policy: GPL
// TODO list:
// make --ssh-proxy use ssh-proxy: when possible
// make ssh-proxy: fallback on --ssh-proxy
// implement --ssh-proxy for FTPFS
// use alarm() more to make sure we don't get stuck anywhere
constant ID="$Id: pcp,v 1.24 1999/05/26 20:46:42 hubbe Exp $";
#define PROTOCOL_VERSION "0001"
// How much is read/written at once
int BLOCK_SIZE=2048;
// How many seconds should be counted into the 'current bps'
float BPS_SECONDS=60.0;
// This gives more debug messages
int debug_level;
// No status messages
int quiet;
// Perform verification
int verify;
// Recurse into subdirectories
int recurse=1;
// Max bps transferred over network
float read_speed=1000000000.0;
// Array of things *not* to copy
array(Regexp) except=({});
// Optimize host to host transfers?
int optimize=1;
// Arguments given to this command
string *args=({"pcp"});
// Don't expand globs
int no_expand;
// Compression?
int compression=-1;
// Total size of files to transfer
float total_size=-1.0;
// Total bytes accounted for
int total_bytes=0;
// Hosts to redirect through
string *ssh_proxies=({});
// Use high TCP priority?
int interactive=0;
// What user to connect as
string default_user;
// What passwd to use
string default_passwd;
#define error(X) throw( ({ X, backtrace() }) )
#define retry() error("retry")
#define is_retry(X) (arrayp(X) && sizeof(X) && X[0]=="retry")
Process.create_process create_process(array(string) args, mapping options)
{
object o;
mixed err;
do {
err=catch {
o=Process.create_process(args,options);
};
} while(err && reverse(err[0])[..3]=="\n11:");
return o;
}
class Summary
{
int bytes;
int files;
int dirs;
void create(int b,int f, int d)
{
bytes=b;
files=f;
dirs=d;
}
Summary `+(Summary x)
{
return Summary(bytes+x->bytes, files+x->files, dirs+x->dirs);
}
};
class Codec
{
object objectof(string s)
{
sscanf(s,"%s(%s)",string ob, string args);
switch(ob)
{
case "Summary":
{
sscanf(args,"%d,%d,%d",int b, int f, int d);
return Summary(b,f,d);
}
}
// throw(({"Cannot decode that!\n",backtrace()}));
}
string nameof(mixed x)
{
if(object_program(x)==Summary)
return sprintf("Summary(%d,%d,%d)",x->bytes,x->files,x->dirs);
// throw(({"Cannot encode that!\n",backtrace()}));
}
};
class VFileSystem
{
string query_fs_type();
string mkurl(string path);
Stdio.File open(string file, string mode, void|int pos);
int file_size(string file, void|int lnk);
int cksum(string file);
int rm(string file);
array(string) get_dir(string dir);
int mkdir(string dir);
int chmod(string file, int mode) { return 0; };
string readlink(string path) { return 0; };
int symlink(string path, string to) { return 0; };
int file_mode(string file) { return -1; }
string combine_path(string dir, string file)
{
while(dir[strlen(dir)-3..]=="//")
dir=dir[strlen(dir)-2..];
if(!dir || dir=="" || dir=="." || dir=="./")
{
if(!file || file=="" || file=="." || file=="./")
return ".";
else
return file;
}
if(!file || file=="" || file=="." || file=="./") return dir;
if(dir[-1]=='/')
return dir+file;
return dir+"/"+file;
// return predef::combine_path(dir,file);
}
string basename(string path)
{
if(path[-1]=='/') path=path[..sizeof(path)-2];
return (path/"/")[-1];
}
string dirname(string path)
{
if(path=="" || path=="." || path=="/") return 0;
if(path[-1]=='/') path=path[..sizeof(path)-2];
array(string) tmp=path/"/";
if(sizeof(tmp)==2 && !strlen(tmp[0])) return "/";
return tmp[..sizeof(tmp)-2]*"/";
}
int verify(string file)
{
return -1;
}
Summary recursive_sizeof(string path)
{
status_line("Counting %s",path);
int size=this_object()->file_size(path);
switch(size)
{
case 0..: return Summary(size,1,0);
default: return Summary(0,0,0);
case -2:
Summary tmp=Summary(0,0,0);
foreach(this_object()->get_dir(path)||({}),string file)
tmp+=recursive_sizeof(combine_path(path,file));
return tmp;
}
}
string *low_expand(string *path_elements, int pos)
{
for(;posmkurl(path_elements[..pos]*"/"));
if(base=="") base=".";
debug(3,"low_expand: isdir(%s)\n",base);
if(this_object()->file_size(base)!=-2) return ({});
debug(3,"low_expand: get_dir(%s)\n",base);
for(int retries=0;retries<20;retries++)
{
if(tmp=this_object()->get_dir(base))
break;
}
if(!tmp) return ({});
string save=path_elements[pos];
debug(1,"Globbing %O against '%s'\n",tmp,save);
foreach(glob(save,tmp),string tmp2)
{
path_elements[pos]=tmp2;
tmp=low_expand(path_elements,pos+1);
path_elements[pos]=save;
if(!tmp) return 0;
ret+=tmp;
}
return ret;
}
}
debug(3,"low_expand -> %s\n",path_elements*"/");
return ({path_elements*"/"});
}
string *expand(string *paths)
{
if(no_expand) return paths;
string *ret=({});
debug(2,"Expanding %O\n",paths);
foreach(paths, string path)
{
string *tmp=low_expand(path/"/",0);
if(!tmp)
{
debug(2,"Expand failed.\n");
return 0;
}
ret+=tmp;
}
debug(2,"Expanded to: %O\n",ret);
status_line("");
status_line("");
return ret;
}
array map(array foo, string func,mixed ... data)
{
return Array.map(foo,`->(this_object(),func),@data);
}
}
array(int|string) decode_host(string host, string user, string pw, int port, int bufsize)
{
if(sscanf(reverse(host),"%s@%s",host,user))
{
user=reverse(user);
host=reverse(host);
if(!sscanf(user,"%s:%s",user,pw))
if(default_passwd) pw=default_passwd;
}else{
if(default_user) user=default_user;
if(default_passwd) pw=default_passwd;
}
sscanf(host,"%s:%d",host,port);
sscanf(host,"%s[%d]",host,bufsize);
return ({ user, pw, host, port, bufsize });
}
class CacheVFileSystem
{
inherit VFileSystem;
mapping(string:int) flcache=([]);
mapping(string:string) lnk=([]);
mapping(string:int) modecache=([]);
static string get_end_of_ls_line(string line)
{
string *elems=line/" ";
int count=0;
if(line[0]=='c') count--;
for(int e=0;e %s",f,string link))
{
debug(2,"\nLINK[%s%s]=%s\n",dir,f,link);
lnk[dir+f]=link;
}
break;
default: continue;
case '-': size=(int)x[4];
}
int mode=parse_mode(x[0]);
ret+=({f});
debug(2,"\nFLCACHE[%s%s]=%d\n",dir,f,size);
flcache[dir+f]=size;
modecache[dir+f]=mode;
if(size==-2)
{
debug(2,"\nFLCACHE[%s%s/]=%d\n",dir,f,size);
flcache[dir+f+"/"]=size;
modecache[dir+f+"/"]=mode;
}
}
return sort(ret);
}
string readlink(string file)
{
if(file_size(file,1)==-3)
return lnk[file];
}
}
// Internal stuff for add_sent
int bytes;
float bps=1.0; // Acceptable guess
int base_time=time();
#if 0
class TimeStamp
{
int bytes;
float time;
};
array(TimeStamp) timestamps=allocate(1000);
int first_timestamp;
int last_timestamp;
int add_sent_bytes;
float first_bps_time=-1.0;
float last_block_size_change=0.0;
void add_sent(int|string b, void|function updater)
{
float current_time;
if(first_bps_time==-1)
{
last_block_size_change=first_bps_time=time(base_time);
return;
}
while(1)
{
if(stringp(b)) b=strlen(b);
bytes+=b;
current_time=time(base_time);
while(first_timestamp < last_timestamp &&
current_time - timestamps[first_timestamp]->time > BPS_SECONDS)
{
add_sent_bytes-=timestamps[first_timestamp]->bytes;
timestamps[first_timestamp]=0;
first_timestamp++;
}
if(last_timestamp==sizeof(timestamps))
{
timestamps=timestamps[first_timestamp..]+allocate(last_timestamp-first_timestamp);
last_timestamp-=first_timestamp;
first_timestamp=0;
}
timestamps[last_timestamp]=TimeStamp();
timestamps[last_timestamp]->time=current_time;
timestamps[last_timestamp]->bytes=b;
last_timestamp++;
add_sent_bytes+=b;
if(first_timestamp < last_timestamp)
{
float t=current_time - timestamps[first_timestamp]->time;
int byt=add_sent_bytes-timestamps[first_timestamp]->bytes;
debug(3,"TIMESTAMPS: %d dt %f db %d\n",
last_timestamp-first_timestamp,
t,
byt);
if(t>0.0)
{
bps=byt/t;
}else{
debug(2,"No delta time2\n");
bps=0.0;
}
if(bps>read_speed)
{
float sleep_time=b/read_speed-t;
debug(3,"\nSleeping %f seconds.\n",sleep_time);
sleep(sleep_time);
b=0;
continue;
}
}else{
debug(2,"No delta time\n");
bps=0.0;
}
break;
}
if(current_time-last_block_size_change > BPS_SECONDS)
{
if(10*BLOCK_SIZE128 && BLOCK_SIZE>bps)
{
BLOCK_SIZE/=2;
last_block_size_change=current_time;
debug(1,"BLOCK_SIZE=%d\n",BLOCK_SIZE);
}
}
}
}
#else
float last_bps_time=-1.0;
float first_bps_time=-1.0;
float last_block_size_change=0.0;
// Routines to keep track of how much data is sent over the network
void add_sent(int|string b, void|function updater)
{
if(first_bps_time==-1)
{
last_block_size_change=first_bps_time=last_bps_time=time(base_time);
return;
}
if(stringp(b)) b=strlen(b);
float t=time(base_time);
float bpssec=max(0.0,min(BPS_SECONDS, (t-first_bps_time)/3.0))+1.0;
float bps_frac=1.0-1.0/bpssec;
bytes+=b;
debug(3,"\nBPS=%f %f %f %f %d",bps,bps_frac,t-last_bps_time,t,b);
bps = bps * pow(bps_frac,(float)(t-last_bps_time)) + b/bpssec;
debug(3," BPS=%f\n",bps);
last_bps_time=t;
if(bps > read_speed)
{
float wakeup_time=log(read_speed/bps) / log(bps_frac);
debug(3,"\nSleeping %f seconds.\n",wakeup_time);
if(updater)
{
wakeup_time+=t;
while(1)
{
float f=wakeup_time - time(base_time);
if(f<1.1)
{
sleep(f);
break;
}else{
sleep(1.0);
updater();
}
}
while(time(base_time) < wakeup_time)
{
sleep( min(1.0, wakeup_time - t));
updater();
}
}else{
sleep(wakeup_time);
}
t=time(base_time);
float bpssec=max(0.0,min(BPS_SECONDS, (t-first_bps_time)/3.0))+1.0;
bps = bps * pow(1.0-1.0/bpssec,(float)(t-last_bps_time));
last_bps_time=t;
}
if(t-last_block_size_change > BPS_SECONDS/2.0)
{
if(10*BLOCK_SIZE128 && BLOCK_SIZE>bps)
{
BLOCK_SIZE/=2;
last_block_size_change=t;
debug(1,"BLOCK_SIZE=%d\n",BLOCK_SIZE);
}
}
}
}
#endif
class Bps
{
float t=time(base_time);
int b=bytes;
float bps()
{
catch {
return (bytes-b) / (time(base_time)-t);
};
return 0.0;
}
}
// Internal variable, tells weather we need to output a newline
// before proceeding or not.
int need_newline;
int last_status_line_length;
void output(string fmt, mixed ... args)
{
string s=sprintf(fmt,@args);
switch(s[0])
{
case '\r': break;
case '\n':
if(!need_newline) s=s[1..];
break;
default:
}
werror(s);
if(!(need_newline=s[-1]!='\n'))
last_status_line_length=0;
}
void debug(int level, string fmt, mixed ... args)
{
if(level <= debug_level)
output(fmt,@args);
}
void status(string fmt, mixed ... args)
{
if(!quiet)
output(fmt,@args);
}
void status_line(string fmt, mixed ... args)
{
if(!quiet)
{
array(string) lines=sprintf(fmt, @args)/"\n";
if(strlen(lines[-1])>78)
{
lines[-1]="<<"+lines[-1][strlen(lines[-1])-76..];
}
int l=strlen(lines[-1]);
if(last_status_line_length>strlen(lines[0]))
lines[0]+=" "*(last_status_line_length-strlen(lines[0]));
output("\r"+lines*"\n");
last_status_line_length=l;
}
}
void erase_status_line()
{
status_line("");
status_line("");
}
string sh_quote(string s)
{
// werror("QUOTING "+s+"\n");
string ret="";
while(sscanf(s,"%[-a-zA-Z0-9_@./]%c%s",string ok, int c, s)==3)
ret+=sprintf("%s\\%c",ok,c);
// werror("QUOTED: "+ret+s+"\n");
return ret+s;
}
class ToDo
{
VFileSystem from_fs;
string path;
Summary summary;
void create(VFileSystem fs, string f, Summary s)
{
from_fs=fs;
path=f;
summary=s;
}
};
class CopyThread
{
class RemoteFS
{
inherit CacheVFileSystem;
string host;
string user;
array(string) local_ssh_proxies=ssh_proxies;
string query_fs_type() { return "ssh"; }
string mkurl(string path)
{
string h=host;
if(user) h=user+"@"+host;
return "ssh://"+h+"/"+path;
}
class Remote
{
string file;
int pos;
inherit Stdio.File : data;
object proc;
int waitforproc=1;
string my_quote_sh_cmd(string s)
{
return replace(s,
({"(",")","`","\"","\\","'","$","&"}),
({"\\(","\\)","\\`","\\\"","\\\\","\\'","\\$","\\&"}));
}
void create(string cmd)
{
Stdio.File x=pipe(Stdio.PROP_BIDIRECTIONAL);
string *ssh_cmd=({"ssh"});
switch(compression)
{
case 0: ssh_cmd+=({"-o","Compression no"}); break;
default: ssh_cmd+=({"-C","-o","CompressionLevel "+compression});
case -1:
}
debug(2,"local proxies: %O\n",local_ssh_proxies);
foreach(local_ssh_proxies,string host)
{
cmd=my_quote_sh_cmd(cmd);
if(!interactive) ssh_cmd+=({"-x"});
if(debug_level>2) ssh_cmd+=({"-v"});
ssh_cmd+=({host,"ssh"});
}
if(user) ssh_cmd+=({"-l",user});
if(debug_level>2) ssh_cmd+=({"-v"});
if(!interactive) ssh_cmd+=({"-x"});
// ssh_cmd+=({"-a"});
debug(2,"Remote: %O\n",ssh_cmd+({host,cmd}));
proc=create_process(ssh_cmd+({host,cmd}),
(["stdin":x,"stdout":x]));
// add_sent(1024);
}
string read(void|int x, void|int y)
{
string ret;
switch(query_num_arg())
{
case 2:
ret=data::read(x,y);
break;
case 1:
ret=data::read(x);
break;
case 0:
ret=data::read();
}
pos+=strlen(ret||"");
waitforproc=0;
return ret;
}
int write(string s)
{
waitforproc=1;
int ret=data::write(s);
m_delete(flcache,file);
return ret;
}
int tell() { return pos; }
int wait() { return proc->wait(); }
int status() { return proc->status(); }
void destroy() {
if(_fd) close();
if(proc && waitforproc)
{
// kill(proc,signum("SIGTERM"));
proc->wait();
}
}
}
Stdio.File open(string file, string mode, void|int pos)
{
switch(mode)
{
case "r":
Remote data=Remote("dd " "2>/dev/null "+
" ibs="+BLOCK_SIZE+
" obs="+BLOCK_SIZE+
" skip="+(pos/BLOCK_SIZE)+
" if="+sh_quote(file));
data->pos=pos - pos%BLOCK_SIZE;
data->file=file;
return data;
case "wct":
case "wtc":
case "twc":
case "tcw":
case "cwt":
case "ctw":
if(pos) return 0;
if(!rm(file))
return 0;
case "cwa": case "caw": case "wca":
case "acw": case "wac": case "awc":
Remote data=Remote("/bin/cat >>"+sh_quote(file));
data->pos=file_size(file);
data->file=file;
return data;
}
}
string safe_remote_popen(string cmd)
{
while(1)
{
object o=Remote("echo Connected;"+cmd+";echo Disconnected");
if(string tmp=o->read())
{
tmp-="\r";
if(tmp[..9]=="Connected\n" && tmp[strlen(tmp)-13..]=="Disconnected\n")
return tmp[10..strlen(tmp)-14];
;
sleep(10);
}
}
}
int safe_remote_system(string cmd)
{
while(1)
{
object o=Remote("echo Connected;"+cmd);
if(string tmp=o->read())
{
tmp-="\r";
if(tmp[..9]=="Connected\n")
return o->wait();
sleep(10);
}
}
}
string do_ls(string where,void|string xflags)
{
return safe_remote_popen("/bin/ls -l"+(xflags||"")+" "+sh_quote(where));
}
array(string) get_dir(string dir)
{
string tmp=do_ls((dir&&strlen(dir))?dir:".");
// add_sent(tmp);
return parse_directory(dir,tmp);
}
int low_file_size(string file)
{
if(file=="") file=".";
if(!zero_type(flcache[file]))
return flcache[file];
if(string dir=dirname(file))
{
get_dir(dir);
}else{
parse_directory("",do_ls(file,"d"),1);
}
if(!zero_type(flcache[file]))
return flcache[file];
return -1;
}
int file_size(string file, void|int lnk)
{
int size=low_file_size(file);
if(lnk) return size;
switch(size)
{
/* Link or special file */
case -3:
case -4:
string tmp=safe_remote_popen("wc -c "+sh_quote(file));
if(sscanf(tmp,"%d ",size))
return flcache[file]=size;
default: return size;
}
}
int cksum(string file)
{
string tmp=safe_remote_popen("cksum "+sh_quote(file));
// add_sent(tmp);
return ((int)tmp) || 1;
}
int rm(string file)
{
return !safe_remote_system("rm "+sh_quote(file));
}
int mkdir(string file)
{
return !safe_remote_system("mkdir "+sh_quote(file));
}
int verify(string path)
{
if(sscanf(reverse(path),"%s.",string ext))
{
switch(reverse(ext))
{
case "gz":
case "tgz":
case "Z":
return !safe_remote_system("gzip -t "+sh_quote(path));
}
}
return -1;
}
void create(string h)
{
host=h;
sscanf(host,"%s@%s",user,host);
}
}
class LocalFS
{
inherit VFileSystem;
string query_fs_type() { return "file"; }
string mkurl(string path)
{
return "file://"+path;
}
Stdio.File open(string file, string mode, void|int pos)
{
Stdio.File f=Stdio.File();
if(!f->open(file,mode)) return 0;
f->seek(pos);
return f;
}
mixed low_file_stat(string file,int lnk)
{
if(file=="") file=".";
if(file[-1]=='/') file+=".";
return file_stat(file,lnk);
}
int low_file_size(string file,int lnk)
{
mixed s=low_file_stat(file,lnk);
if(!s) return -1;
return s[1];
}
int file_size(string file,int|void lnk)
{
int size=low_file_size(file,lnk);
if(lnk) return size;
switch(size)
{
/* Link or special file */
case -3: return -3;
case -4:
string tmp=Process.popen("wc -c "+sh_quote(file));
if(sscanf(tmp,"%d ",size))
return size;
default: return size;
}
}
int cksum(string file)
{
return ((int)Process.popen("cksum "+sh_quote(file))) || 1;
}
int rm(string file)
{
if(file[-1]=='/') file=file[..sizeof(file)-2];
return predef::rm(file);
}
int mkdir(string file)
{
if(file[-1]=='/') file=file[..sizeof(file)-2];
return predef::mkdir(file);
}
int chmod(string file, int mode)
{
return !catch {
predef::chmod(file,mode);
};
}
int file_mode(string file)
{
mixed s=low_file_stat(file,0);
if(!s) return -1;
return s & 07777;
}
string readlink(string s)
{
return predef::readlink(s);
}
int symlink(string s, string to)
{
return !catch {
predef::symlink(s,to);
};
}
array(string) get_dir(string dir)
{
if(!strlen(dir))
dir=".";
else if(dir[-1]=='/')
dir+=".";
return predef::get_dir(dir);
}
int verify(string path)
{
if(sscanf(reverse(path),"%s.",string ext))
{
switch(reverse(ext))
{
case "gz":
case "tgz":
case "Z":
return !Process.system("gzip -t "+sh_quote(path));
}
}
return -1;
}
}
class SmartRemoteFS
{
int buffersize=10;
inherit RemoteFS;
#define RETRY do{ debug(2,"RETRY at %d\n", __LINE__); if(ServerConn) destruct(ServerConn); retry(); }while(0)
#define TRY while(1) { if(mixed _err=catch {
#define UNTIL_DONE }) { if(!is_retry(_err)) throw(_err); sleep(1); continue; } break; }
string query_fs_type() { return "ssh-proxy"; }
string mkurl(string path)
{
// werror("%s\n",path);
if(sscanf(path,"file:%s", string tmp) && buffersize<20)
{
sscanf(tmp,"//%s",tmp);
return ::mkurl(tmp);
}else{
string h=host;
if(buffersize!=10) h+="["+buffersize+"]";
if(user) h=user+"@"+host;
return "ssh-proxy://"+h+"/"+path;
}
}
void create(string h)
{
::create(h);
sscanf(host,"%s[%d]",host,buffersize);
}
Remote ServerConn;
Codec codec=Codec();
int need_to_read_version;
Remote start()
{
if(ServerConn)
return ServerConn;
string cmd="( test \"x`type pcp 2>/dev/null`\" != x && pcp";
if(debug_level)
cmd+=" -d"+debug_level;
if(read_speed < 1000000000.0)
cmd+=" -s"+read_speed;
cmd+=sprintf(" --pcp-server-mode=%d",buffersize);
debug(2,"\nStarting remote pcp server.\n");
cmd+=" ) || echo WRONG";
need_to_read_version=1;
return ServerConn=Remote(cmd);
}
void read_version()
{
if(need_to_read_version)
{
if(!ServerConn) RETRY;
if(ServerConn->read(4)!=PROTOCOL_VERSION)
RETRY;
need_to_read_version=0;
}
}
int test()
{
TRY {
debug(2,"Testing if remoe has pcp.\n");
start();
string s=ServerConn->read(4);
if(strlen(s)!=4) RETRY;
need_to_read_version=0;
return s==PROTOCOL_VERSION;
} UNTIL_DONE;
}
class FakeFunction
{
string fun_name;
void create(string f) { fun_name=f; }
mixed `()(mixed ... args)
{
TRY {
debug(2,"\nCalling fake function %s%O\n",fun_name,args);
start();
mixed data=encode_value( ({fun_name})+args,codec);
if(ServerConn->write(sprintf("%4c%s",strlen(data),data))!=strlen(data)+4)
RETRY;
read_version();
string s=ServerConn->read(4);
if(!s) RETRY;
int len;
if(!sscanf(s,"%4c",len)) RETRY;
data=ServerConn->read(len);
if(strlen(data) != len) RETRY;
data=decode_value(data,codec);
debug(2,"\nFake function %s returns: %O\n",fun_name,data);
return data;
} UNTIL_DONE;
}
}
string combine_path(string dir, string file)
{
if(strlen(dir) && dir[-1]==':')
return dir+file;
return ::combine_path(dir,file);
}
string basename(string dir)
{
if(search(dir,"/")==-1 && sscanf(dir,"%*s:%s",dir))
return dir;
return ::basename(dir);
}
function `->(string x)
{
switch(x)
{
case "file_size":
case "cksum":
case "get_dir":
case "rm":
case "chmod":
case "readlink":
case "mkdir":
case "verify":
case "recursive_sizeof":
case "expand":
case "map":
case "basename":
debug(2,"\nFaking function %s:%s\n",host,x);
return FakeFunction(x);
}
return this_object()[x];
}
class FakeFile
{
Remote SCon;
string ibuf="";
int pos;
int writing;
int err=0;
int errno() { return err; }
int get_return_code()
{
string tmp=SCon->read(4);
sscanf(tmp,"%4c",int len);
tmp=SCon->read(len);
}
string read(int|int howmuch, int|int blocking)
{
string ret;
if(writing)
error("File not open for reading.\n");
if(!query_num_arg()) howmuch=0x7fffffff;
while(SCon && (blocking?strlen(ibuf)read(4);
sscanf(tmp,"%4c",len);
debug(3,"Fakefile read() got len=%d\n",len);
if(!len)
{
get_return_code();
if(!ServerConn)
{
ServerConn=SCon;
SCon=0;
}else{
destruct(SCon);
SCon=0;
}
}else{
string tmp=SCon->read(len);
if(!tmp || strlen(tmp) != len)
{
debug(3,"Fakefile failed read() got len=%d\n",strlen(tmp||""));
destruct(SCon);
tmp="";
break;
}
ibuf+=tmp;
}
}
if(howmuch>=strlen(ibuf))
{
ret=ibuf;
ibuf="";
}else{
ret=ibuf[..howmuch-1];
ibuf=ibuf[howmuch..];
}
pos+=strlen(ret);
return ret;
}
int write(string data)
{
if(!writing || !SCon)
error("File not open for writing.\n");
if(!strlen(data)) return 0;
int len=SCon->write(sprintf("%4c%s",strlen(data),data));
if(len!=strlen(data)+4)
{
debug(2,"FakeFile write failed %d != %d+4\n",len,strlen(data));
err=SCon->errno();
SCon->waitforproc=0;
destruct(SCon);
return -1;
}
pos+=strlen(data);
return strlen(data);
}
void create(string mode)
{
writing=mode == "w";
read_version();
SCon=ServerConn;
ServerConn=0;
string tmp=SCon->read(4);
if(strlen(tmp||"")!=4)
{
destruct(SCon);
RETRY;
}
sscanf(tmp,"%4c",pos);
debug(3,"Fakefile starting at %d, mode = %s\n",pos,mode);
}
int close(void|string how)
{
if(!SCon) return 0;
if(writing)
{
int r=SCon->write(sprintf("%4c",0));
if(r!=4)
error(sprintf("Failed to close (%d != 4).\n",r));
get_return_code();
if(!ServerConn)
{
ServerConn=SCon;
SCon=0;
}else{
destruct(SCon);
}
}else{
destruct(SCon);
}
return 1;
}
int tell() { return pos; }
void destroy()
{
close();
}
}
Stdio.File open(string file, string mode, void|int pos)
{
TRY {
debug(2,"\nOpening %s %s %d\n",file,mode,pos);
if(search(mode,"r")!=-1)
{
mixed cmd=encode_value( ({"bincatout",file,mode,pos }), codec);
if(start()->write(sprintf("%4c%s",strlen(cmd),cmd))!=strlen(cmd)+4)
RETRY;
return FakeFile("r");
}
if(search(mode,"w")!=-1)
{
mixed cmd=encode_value( ({"bincatin",file,mode,pos }),codec);
if(start()->write(sprintf("%4c%s",strlen(cmd),cmd))!=strlen(cmd)+4)
RETRY;
return FakeFile("w");
}
return 0;
} UNTIL_DONE;
}
int do_optimize(array(string) cmd)
{
cmd=Array.map(args+cmd,sh_quote);
object r=Remote(cmd*" ");
cat(r);
return r->wait();
}
};
mapping(string:mapping(string:int)) supports_cache=([]);
class FTPFS
{
inherit CacheVFileSystem;
string host;
int port=21;
string user="anonymous";
string pw="pcp@";
#define RETRY do{ debug(2,"RETRY at %d\n", __LINE__); destruct(ftpserv); retry(); }while(0)
#define TRY while(1) { if(mixed _err=catch {
#define UNTIL_DONE }) { if(!is_retry(_err)) throw(_err); sleep(10); continue; } break; }
Stdio.FILE ftpserv;
mapping(string:int) supports=([]);
string query_fs_type() { return "ftp"; }
string mkurl(string path)
{
string h=host;
if(port!=21) h+=":"+port;
if(user!="anonymous" || pw!="pcp@")
{
string u=user || "anonymous";
if(pw) u+=":"+pw;
h=u+"@"+h;
}
return "ftp://"+h+"/"+path;
}
string body;
int ftp_answer()
{
body="";
int seen_minus;
while(1)
{
int i,c;
string data;
string s=ftpserv->gets();
if(!s) RETRY;
debug(2,"ftp%s< %O\n",host,s);
if(sscanf(s,"%d%c%s",i, c, data)!=3)
{
if(seen_minus)
{
data=s;
c='-';
}else{
RETRY;
}
}
body+=data+"\n";
if(c!='-')
return i;
else
seen_minus++;
}
}
int ftp_success()
{
while(1)
{
int ret=ftp_answer();
switch(ret/100)
{
case 1: continue; /* In progress - wait */
case 2: return ret; /* Ok, done */
case 3: return ret; /* Ok, send next command in sequence */
case 4: return 0; /* Transient failiure */
case 5: return 0; /* Failure */
default: RETRY; /* Buggy FTP server */
}
}
}
int await_return_code;
int send_cmd(string cmd)
{
if(await_return_code)
{
await_return_code=0;
ftp_success();
}
debug(2,"FTP%s> %s\n",host,cmd);
cmd+="\r\n";
if(!ftpserv) RETRY;
if(ftpserv->write(cmd)!=strlen(cmd))
RETRY;
}
int very_low_ftpcmd(string cmd)
{
send_cmd(cmd);
return ftp_answer();
}
int low_ftpcmd(string cmd)
{
int ret=very_low_ftpcmd(cmd);
while(ret/100==1) ret=ftp_answer();
return ret;
}
int ftpcmd(string cmd)
{
send_cmd(cmd);
return ftp_success();
}
int do_connect()
{
if(ftpserv) return 1;
ftpserv=Stdio.FILE();
debug(2,"FTP: Connecting to %s:%d\n",host,port);
int x;
if(catch {
x=ftpserv->connect(host,port);
})
RETRY;
if(!x)
{
werror("Failed to connect to %s:%d\n",host,port);
RETRY;
}
debug(2,"FTP: Connected\n");
if(!ftp_success())
{
werror("No ftp answer at %s:%d\n",host,port);
RETRY;
}
debug(2,"FTP: Sending USER\n");
int ret=ftpcmd("USER "+user);
switch(ret/100)
{
default:
werror("Login failed.\n");
RETRY;
case 3:
if(user!="anonymous" && pw=="pcp@a.b")
{
pw=readline(sprintf("password for %s@%s: ",user,host));
if(!pw) exit(0);
}
debug(2,"FTP: Sending PASS\n");
if(!ftpcmd("PASS "+pw))
{
werror("Failed to login: %s.\n",body);
RETRY;
}
case 2:
}
debug(2,"FTP: Switching to binary mode\n");
ftpcmd("TYPE I");
return 1;
}
string iface;
Stdio.Port low_ftpport()
{
Stdio.Port p=Stdio.Port();
if(!iface)
{
do_connect();
iface=(ftpserv->query_address(1)/" ")[0];
}
int e;
for(e=0;e<10;e++)
if(p->bind(0,0,iface))
break;
if(e==10)
{
iface=0;
sleep(10);
RETRY;
}
sscanf(p->query_address(),"%*s %d",int local_port);
sscanf(iface,"%d.%d.%d.%d",int a,int b,int c,int d);
int ret=ftpcmd(sprintf("PORT %d,%d,%d,%d,%d,%d",a,b,c,d,
local_port/256,local_port%256));
if(!ret)
{
predef::write("Port command failed. %d %s",ret,body);
RETRY;
}
return p;
}
class pasv_ftpport
{
private int p;
private Stdio.File o;
Stdio.File accept()
{
return o;
}
void create(int p)
{
o=Stdio.File();
debug(2,"FTP: Connecting to %s:%d\n",host,p);
if(o->connect(host,p))
{
supports->PASV=1;
debug(2,"FTP: Connection worked!\n");
}else{
debug(2,"FTP: Connection failed\n");
o=0;
}
}
}
Stdio.Port ftpport()
{
if(supports->PASV >= 0)
{
do_connect();
if(ftpcmd("PASV"))
if(sscanf(body,"%*s(%*d,%*d,%*d,%*d,%d,%d)",int low, int high)==7)
return pasv_ftpport(low*256 + high);
supports->PASV=-1;
}
return low_ftpport();
}
string get_ftp_data(string cmd)
{
string tmp="";
Stdio.Port p=ftpport();
if(very_low_ftpcmd(cmd)/100!=1)
{
predef::write("Command failed.\n");
RETRY;
}
Stdio.File o=p->accept();
if(!o) RETRY;
while(string s=o->read(1000,1))
{
if(!strlen(s)) break;
// add_sent(s);
tmp+=s;
}
destruct(o);
if(!ftp_success())
{
predef::write("Command failed.\n");
RETRY;
}
return tmp;
}
array(string) get_dir(string dir)
{
string tmp;
TRY {
do_connect();
string tmp=get_ftp_data("LIST -l "+((dir&&strlen(dir))?dir:"."));
// add_sent(tmp);
return parse_directory(dir,tmp);
} UNTIL_DONE;
}
int file_size(string file,void|int lnk)
{
TRY {
if(!zero_type(flcache[file]))
return flcache[file];
if(supports->SIZE >= 0 && !lnk)
{
do_connect();
int ret=low_ftpcmd("SIZE "+file);
switch(ret)
{
case 213:
supports->SIZE=1;
if((int)body)
{
return flcache[file]=(int)body;
}
break; /* We cannot trust SIZE */
default:
supports->SIZE=-1;
case 550:
/* Directory or nonexistant */
}
}
string dir=dirname(file);
if(!dir) return flcache[file]=-2; /* Directory */
get_dir(dir);
if(!zero_type(flcache[file]))
return flcache[file];
return -1;
} UNTIL_DONE;
}
class Remote
{
string file;
int pos;
inherit Stdio.File : data;
string read(void|int x, void|int y)
{
string ret;
switch(query_num_arg())
{
case 2:
ret=data::read(x,y);
break;
case 1:
ret=data::read(x);
break;
case 0:
ret=data::read();
}
pos+=strlen(ret||"");
return ret;
}
int write(string s)
{
int ret=data::write(s);
m_delete(flcache,file);
return ret;
}
int tell() { return pos; }
}
Stdio.File open(string file, string mode, void|int pos)
{
Stdio.Port p;
TRY {
do_connect();
switch(mode)
{
case "r":
p=ftpport();
if(pos)
if(!ftpcmd("REST "+pos))
pos=0;
int ret=very_low_ftpcmd("RETR "+file);
if(ret/100!=1)
{
werror("RETR failed %d:%s\n",ret,body);
RETRY;
}
await_return_code=1;
break;
case "wct":
case "wtc":
case "twc":
case "tcw":
case "cwt":
case "ctw":
if(pos) return 0;
if(!rm(file))
return 0;
case "cwa":
case "caw":
case "wca":
case "acw":
case "wac":
case "awc":
pos=file_size(file);
p=ftpport();
ret=very_low_ftpcmd("APPE "+file);
if(ret/100!=1)
{
werror("APPE failed: %d %s\n",ret,body);
RETRY;
}
await_return_code=1;
break;
default:
return 0;
}
if(!p) return 0;
Stdio.File data=Remote();
object x=p->accept();
if(!x) RETRY;
data->assign(x);
data->file=file;
data->pos=pos;
return data;
} UNTIL_DONE;
}
int rm(string file)
{
TRY {
do_connect();
return ftpcmd("DELE "+file);
} UNTIL_DONE;
}
int mkdir(string dir)
{
TRY {
do_connect();
return ftpcmd("MKD "+dir);
} UNTIL_DONE;
}
int cksum(string file)
{
return 0;
}
void create(string hostdata)
{
host=hostdata;
if(!(supports=supports_cache[hostdata]))
supports=supports_cache[hostdata]=([]);
if(sscanf(reverse(host),"%s@%s",host,user))
{
user=reverse(user);
host=reverse(host);
sscanf(user,"%s:%s",user,pw);
}
sscanf(host,"%s:%d",host,port);
TRY {
do_connect();
} UNTIL_DONE ;
}
int verify(string path)
{
return -1;
}
int do_optimize(string from, FTPFS tofs, string to)
{
if(supports->PASV<0) return 0;
TRY {
int ret,a,b,c,d;
int size=tofs->file_size(to);
if(size==-1) size=0;
do_connect();
if(!ftpcmd("PASV"))
{
supports->PASV=-1;
return 0;
}
sscanf(body,"%*s(%*d,%*d,%*d,%*d,%d,%d)",int low, int high);
if(catch {
string ip=ftpserv->query_address();
debug(2,"IP: %s -> %s\n",host,ip);
sscanf(ip,"%d.%d.%d.%d",a,b,c,d);
}) return 0;
if(!tofs->ftpcmd(sprintf("PORT %d,%d,%d,%d,%d,%d",a,b,c,d,low,high)))
{
werror("PORT failed.\n");
return 0;
}
if(size)
if(!ftpcmd("REST "+size))
size=0;
ret=tofs->very_low_ftpcmd((size?"APPE":"STOR")+" "+to);
if(ret/100!=1)
{
werror("APPE/STOR failed.\n");
RETRY;
}
ret=very_low_ftpcmd("RETR "+from);
if(ret/100!=1)
{
werror("RETR failed.\n");
RETRY;
}
// NO, this line is NOT missing a '&' /hubbe
if(!(ftp_success() & tofs->ftp_success()))
RETRY;
return 1;
} UNTIL_DONE;
}
};
#if 1
class HTTPFS
{
inherit VFileSystem;
string host;
int port=80;
string user;
string pw;
#define RETRY do{ debug(2,"HTTP RETRY at %d\n", __LINE__); retry(); }while(0)
#define TRY while(1) { if(mixed _err=catch {
#define UNTIL_DONE }) { if(!is_retry(_err)) throw(_err); sleep(1); continue; } break; }
string query_fs_type() { return "http"; }
string mkurl(string path)
{
string h=host;
if(port!=80) h+=":"+port;
if(user || pw)
{
string u=user || "anonymous";
if(pw) u+=":"+pw;
h=u+"@"+h;
}
return "http://"+h+"/"+path;
}
string dirname(string path)
{
if(path=="" || path=="." || path=="/") return 0;
array(string) tmp=path/"/";
return tmp[..sizeof(tmp)-2]*"/";
}
object ssh_proxy;
class Request
{
inherit Stdio.FILE : data;
int pos,code;
mapping headers=([]);
void create(string path, string mode, int wanted_pos)
{
string h=host;
int p=port;
if(sizeof(ssh_proxies))
{
if(!ssh_proxy || ssh_proxy->status())
{
Stdio.File f=Stdio.File();
array(string) cmd,ssh_cmd=({"echo OK;sleep 900"});
Stdio.File x=f->pipe(Stdio.PROP_BIDIRECTIONAL);
foreach(reverse(ssh_proxies),string host)
{
int lport=6000+random(10000);
cmd=({"ssh",sprintf("-L%d:%s:%d",lport,h,p)});
// if(h!="localhost") cmd+=({"-a"});
p=lport;
h="localhost";
if(!interactive) cmd+=({"-x"});
if(debug_level>2) cmd+=({"-v"});
cmd+=({host});
ssh_cmd=cmd+ssh_cmd;
}
ssh_cmd=ssh_cmd[1..];
cmd=({"ssh"});
switch(compression)
{
case 0: cmd+=({"-o","Compression no"}); break;
default: cmd=({"-C","-o","CompressionLevel "+compression});
case -1:
}
if(user) cmd+=({"-l",user});
cmd+=ssh_cmd;
debug(2,"HTTP: using ssh proxy cmd: %O\n",cmd);
ssh_proxy=create_process(cmd,(["stdin":x,"stdout":x]));
destruct(x);
if(f->read(2)!="OK")
RETRY;
}
}
int x;
if(catch { x=connect(h,p); } || !x)
RETRY;
debug(2,"Connected to %s:%d\n",host,port);
string req=sprintf("GET /%s HTTP/1.0\r\n"
"Host: %s\r\n"
"User-agent: pcp/%s\r\n"
"Accept: */*\r\n",
path,
host,
replace(ID," ","_"));
if(wanted_pos) req+=sprintf("Range: bytes=%d-\r\n",wanted_pos);
if(user || pw)
{
req+=sprintf("Authorization: Basic %s\r\n",
MIME.encode_base64((user||"anonymous") +":"+ pw));
}
req+="\r\n";
write(req);
debug(2,"SENT: %s",req);
string s=gets();
if(!s) RETRY;
s-="\r";
sscanf(lower_case(s),"http%s %d %s",string version, code, string rest);
debug(3,"Return line: %s\n",s);
debug(2,"Return Code: %d\n",code);
switch(code)
{
case 206:
pos=wanted_pos;
code=200;
break;
case 200:
pos=0;
}
while(1)
{
string l=gets();
if(!l) break;
l-="\r";
if(l=="") break;
if(sscanf(l,"%s: %s",string x, string y)==2)
{
debug(3,"Got header %s = %s\n",x,y);
while(y[-1]==' ')
y=y[..strlen(y)-2]+(gets()-"\r");
x=lower_case(x);
if(headers[x])
headers[x]+="\n"+y;
else
headers[x]=y;
}else{
debug(3,"Got some line: %s\n",l);
}
}
debug(3,"Headers = %O\n",headers);
}
string read(void|int x, void|int y)
{
string ret;
switch(query_num_arg())
{
case 2:
ret=data::read(x,y);
break;
case 1:
ret=data::read(x);
break;
case 0:
ret=data::read();
}
pos+=strlen(ret);
return ret;
}
int write(string s)
{
int ret=data::write(s);
// m_delete(flcache,file);
return ret;
}
int tell() { return pos; }
}
Stdio.File ready_to_read;
string ready_to_read_file;
Stdio.File open(string file, string mode, void|int pos)
{
TRY {
switch(mode)
{
case "r":
if(ready_to_read_file==file && pos==ready_to_read->tell())
{
object tmp=ready_to_read;
ready_to_read_file=0;
ready_to_read=0;
return tmp;
}
object data=Request(file,"",pos);
if(data->code!=200)
return 0;
return data;
}
} UNTIL_DONE;
}
mapping get_headers(string path)
{
if(!ready_to_read_file || path!=ready_to_read_file)
{
ready_to_read=0;
ready_to_read=Request(path,"",0);
if(!ready_to_read || ready_to_read->code!=200 || !ready_to_read->headers)
{
ready_to_read=0;
ready_to_read_file=0;
return 0;
}
ready_to_read_file=path;
}
return ready_to_read->headers;
}
int file_size(string path)
{
TRY {
mapping h=get_headers(path);
if(!h) return -1;
string l=h["content-length"];
if(!l) return -4;
debug(3,"file_size(%s) = %s\n",path,l);
return (int)l;
} UNTIL_DONE;
}
int cksum(string file) { return 0; }
int mkdir(string file) { return -1; }
string *get_dir(string file) { return 0; }
int rm(string file) { return -1; }
void create(string hostdata)
{
[ user, pw, host, port, int bufsize ] =
decode_host(hostdata,0,0,80,0);
}
string *expand(string *s) { return s; }
string basename(string path)
{
TRY {
mapping h=get_headers(path);
if(h && h["content-disposition"])
{
if(sscanf(h["content-disposition"],"%*sfilename=\"%s\"",string filename))
{
return ::basename(filename);
}
}
return ::basename(path);
} UNTIL_DONE;
}
}
#endif
class ProxyFS
{
inherit VFileSystem;
string query_fs_type() { return "proxy"; }
string mkurl(string path) { return "proxy:"+path; }
class FakeFunction
{
string fun_name;
void create(string f) { fun_name=f; }
mixed `()(mixed path, mixed ... args)
{
VFileSystem fs;
[fs,path]=fsopen(path);
return fs[fun_name](path,@args);
}
}
#define FAKE(Y,X) \
Y X(string path, mixed ... args) { \
VFileSystem fs; \
[fs,path]=fsopen(path); \
return fs->X(path,@args); \
}
FAKE(int,file_size)
FAKE(int,rm)
FAKE(int,cksum)
FAKE(int,mkdir)
FAKE(Summary,recursive_sizeof)
#undef FAKE
string *expand(string *paths)
{
string *ret=({});
foreach(paths, string path)
{
VFileSystem fs;
[fs,path]=fsopen(path);
foreach(fs->expand( ({path}) ), string expanded)
ret+=({ fs->mkurl(expanded) });
}
return ret;
}
Stdio.File open(string file, string mode, void|int pos)
{
VFileSystem fs;
[fs,file]=fsopen(file);
return fs->open(file,mode,pos);
}
array(string) get_dir(string dir)
{
VFileSystem fs;
[fs,dir]=fsopen(dir);
return fs->get_dir(dir);
}
string combine_path(string dir, string file)
{
VFileSystem fs;
[fs,dir]=fsopen(dir);
return fs->mkurl(fs->combine_path(dir,file));
}
string basename(string path)
{
VFileSystem fs;
[fs,path]=fsopen(path);
return fs->basename(path);
}
string dirname(string path)
{
VFileSystem fs;
[fs,path]=fsopen(path);
return fs->mkurl(fs->dirname(path));
}
#if constant(thread_create)
int bincatout(string file, string mode, int pos)
{
Thread.Fifo fifo=Thread.Fifo(fifo_size);
Stdio.File f=open(file,mode,pos);
debug(3,"bincatout open(%s,%s,%d), pos=%d\n",file,mode,pos,f->tell());
write("%4c",f->tell());
thread_create(lambda(Stdio.File f,
Thread.Fifo fifo,
string file,
string mode,
int pos)
{
while(1)
{
string s=f->read(BLOCK_SIZE,1);
if(!s)
{
while(!(f=open(file,mode,pos)))
sleep(10+fifo->size()/10);
while(f->tell() < pos)
f->read(min(pos - f->tell(),65536));
continue;
}
if(!strlen(s)) break;
fifo->write(s);
pos+=strlen(s);
debug(4,"FIFO %d/%d\n",fifo->size(),fifo_size);
}
fifo->write(0);
},f,fifo,file,mode,pos);
while(string s=fifo->read())
{
debug(4,"BINCATOUT, writing %d bytes\n",strlen(s));
int written=write("%4c%s",strlen(s),s);
add_sent(s);
if(written!=strlen(s)+4) exit(1);
}
write("%4c",0);
return 0;
}
int bincatin(string file, string mode, int pos)
{
object in=Stdio.File("stdin");
Stdio.File f=open(file,mode,pos);
debug(3,"bincatin open(%s,%s,%d), pos=%d\n",file,mode,pos,f->tell());
write("%4c",f->tell());
while(1)
{
string tmp=in->read(4);
if(strlen(tmp||"")!=4)
{
debug(2,"bincatin: Read block length failed.\n");
exit(1);
}
sscanf(tmp,"%4c",int len);
debug(4,"BINCATIN, reading %d bytes\n",len);
if(!len)
{
debug(2,"bincatin: Got EOF.\n");
break;
}
string s=in->read(len);
if(strlen(s||"")!=len)
{
debug(2,"bincatin: Failed to read input.\n");
exit(1);
}
int written=f->write(s);
add_sent(s);
if(written!=len)
{
debug(2,"bincatin: Failed to wrie output.\n");
exit(1);
}
}
return 0;
}
#else
int bincatout(string file, string mode, int pos)
{
Stdio.File f=open(file,mode,pos);
debug(3,"bincatout open(%s,%s,%d), pos=%d\n",file,mode,pos,f->tell());
write("%4c",f->tell());
while(1)
{
string s=f->read(BLOCK_SIZE,1);
if(!s || !strlen(s)) break;
debug(4,"BINCATOUT, writing %d bytes\n",strlen(s));
int written=write("%4c%s",strlen(s),s);
add_sent(s);
if(written!=strlen(s)+4) exit(1);
}
write("%4c",0);
return 0;
}
int bincatin(string file, string mode, int pos)
{
object in=Stdio.File("stdin");
Stdio.File f=open(file,mode,pos);
debug(3,"bincatin open(%s,%s,%d), pos=%d\n",file,mode,pos,f->tell());
write("%4c",f->tell());
while(1)
{
string tmp=in->read(4);
if(strlen(tmp||"")!=4) exit(1);
sscanf(tmp,"%4c",int len);
debug(4,"BINCATIN, reading %d bytes\n",len);
if(!len) break;
string s=in->read(len,1);
if(strlen(s||"")!=len) exit(1);
int written=f->write(s);
add_sent(s);
if(written!=len) exit(1);
}
return 0;
}
#endif
int fifo_size=10;
void create(int blocks)
{
if(blocks) fifo_size=blocks;
}
};
mapping has_pcp_cache=([]);
mapping(string:VFileSystem) fscache=([]);
VFileSystem get_smartremotefs_if_possible(string host,
void|array(string) proxies)
{
if(has_pcp_cache[host]==-1) return 0;
string cent="ssh-proxy:"+host;
object o=fscache["ssh-proxy:"+host]=fscache[cent] || SmartRemoteFS(host);
o->local_ssh_proxies=proxies || ssh_proxies;
if(has_pcp_cache[host]==1) return o;
if(o->test())
{
has_pcp_cache[host]=1;
return o;
}else{
has_pcp_cache[host]=-1;
destruct(o);
}
}
mixed fsopen(string path)
{
string proto, host;
if(!sscanf(path,"%s:%s",proto, path))
proto="file";
switch(proto)
{
case "file":
sscanf(path,"//%s",path);
return ({LocalFS(),path});
case "proxy":
return ({ProxyFS(0),path});
default:
path="//"+proto+"/"+path;
proto="ssh";
case "ssh-proxy":
case "ssh-raw":
case "ssh":
case "ftp":
case "http":
if(sizeof(ssh_proxies))
{
if(object o=get_smartremotefs_if_possible(ssh_proxies[-1],
ssh_proxies[..sizeof(ssh_proxies)-2]))
{
return ({o,proto+":"+path});
}
}
if(!sscanf(path,"//%s",path))
{
debug(2,"URL: %s\n",path);
werror("Url format: "+proto+"://host/path\n");
exit(1);
}
if(!sscanf(path,"%s/%s",host,path))
{
host=path;
path="";
}
string cent=proto+":"+host;
if(fscache[cent])
return ({ fscache[cent], path });
switch(proto)
{
case "ftp":
return ({fscache[cent]=FTPFS(host), path});
case "ssh":
if(object tmp=get_smartremotefs_if_possible(host))
{
return ({ fscache[cent]=tmp,"file:"+path });
}else{
return ({fscache[cent]=RemoteFS(host), path});
}
case "ssh-raw":
return ({fscache[cent]=RemoteFS(host), path});
case "ssh-proxy":
return ({ fscache[cent]=SmartRemoteFS(host),path });
case "http":
return ({ fscache[cent]=HTTPFS(host),path });
}
}
werror("\nfsopen failed!\n");
exit(1);
}
class Status
{
float bps=-1.0;
float done=-1.0;
float howmuch=-1.0;
void create(float|int b,float|int d,float|int h)
{
bps=(float)b;
done=(float)d;
howmuch=(float)h;
}
string left()
{
return ""+(int)((howmuch-done)/1024.0);
}
string done_of_howmuch()
{
return sprintf("%d/%d",(int)(done/1024.0),(int)(howmuch/1024.0));
}
string kbps()
{
return sprintf("%2.2f",bps/1024.0);
}
string eta()
{
if(bps<=0.0) return "";
int seconds=(int)((howmuch-done)/bps);
switch(seconds)
{
default: return "";
case 0..: return sprintf("%dm%02ds",seconds/60,seconds%60); break;
case 3600..: return sprintf("%dh%02dm",seconds/3600,(seconds/60)%60); break;
case 86400..:return sprintf("%dd%02dh",seconds/86400,(seconds/3600)%24);
}
}
};
string base;
void update_file(VFileSystem from_fs,
string from,
VFileSystem to_fs,
string to,
string b)
{
base=b;
}
void update_status(Status file,
Status all,
string message)
{
string output=" "+base;
if(file)
output+=sprintf(" %s kb %s kb/s",file->done_of_howmuch(),file->kbps());
if(all)
{
// werror("%O %O\n",all->howmuch, all->done);
output+=sprintf(" left: %s %s",all->left(),all->eta());
}
if(message)
output+=" "+message;
status_line(output);
}
class low_copy
{
VFileSystem from_fs;
string from;
VFileSystem to_fs;
string to;
int size;
string base;
int old_start=-10;
int retries=0;
int start=-1;
Bps file_bps=Bps();
int restart_from_scratch=0;
Status eta()
{
float my_bps;
float left;
float total;
if(total_size!=-1.0)
{
total=(float)total_bytes;
left=total_size - total_bytes - start;
float tid=time(base_time)-first_bps_time;
if(tid<=0.0) return 0;
my_bps=bytes/tid;
}else{
left=(float)(size-start);
my_bps=bps;
total=(float)size;
}
if(my_bps<=0.0)
{
debug(2,"eta(): ZERO BPS\n");
return 0;
}
return Status(my_bps,total-left ,total);
}
void refresh(void|string msg)
{
update_status(Status(bps, start, size), eta(), msg);
}
#if constant(alarm)
#define ALARM_DELAY 5
int alarm_counter;
void alarm_func()
{
debug(1,"\nALARM\n");
int timeout=alarm_counter++ > 5 * 60 / ALARM_DELAY;
add_sent(0);
update_status(Status(bps, start, size), eta(), timeout&&"timeout");
// status_line(" %s %d/%d kb %1.2f kb/s%s%s",base,start/1024,size/1024,bps/1024.0,eta(),timeout?" timeout":"");
alarm(ALARM_DELAY);
if(timeout) throw(({"timeout",backtrace()}));
}
#define START_ALARM() \
debug(3,"ALARM activated\n"); \
alarm_counter=0; \
alarm(ALARM_DELAY); \
signal(signum("SIGALRM"),alarm_func); \
{ mixed _foo = catch { \
#define END_ALARM() \
}; \
alarm_counter=0; \
alarm(0); \
signal(signum("SIGALRM"),lambda(){}); \
debug(3,"ALARM deactivated\n"); \
if(arrayp(_foo) && sizeof(_foo) && _foo[0]=="timeout") break; if(_foo) throw(_foo); }
#else
#define START_ALARM()
#define END_ALARM()
#endif
void create(VFileSystem _from_fs,
string _from,
VFileSystem _to_fs,
string _to,
int name_combined)
{
from_fs=_from_fs;
from=_from;
to_fs=_to_fs;
to=_to;
foreach(except, Regexp r)
if(r->match(from))
return;
// FIXME: move the tests to the filesystems themselves
if(optimize &&
!(to_fs->query_fs_type()=="file" ||
from_fs->query_fs_type()=="file"))
{
if(to_fs->query_fs_type()=="ssh-proxy")
if(!to_fs->do_optimize(({from_fs->mkurl(from),to})))
return;
if(from_fs->query_fs_type()=="ssh-proxy")
if(!to_fs->do_optimize(({from,to_fs->mkurl(to)})))
return;
}
base=from_fs->basename(from);
debug(2,"From = %s, To = %s, base = %s\n",from,to,base);
update_file(from_fs,from,to_fs,to,base);
update_status(0,0,0);
// status_line(" %s ",base);
switch(size=from_fs->file_size(from))
{
case -1:
werror("No such file "+from+".\n");
return;
case -3:
{
/* Link */
switch(to_fs->file_size(to,1))
{
case -2:
if(!name_combined)
{
to=to_fs->combine_path(to,base);
name_combined=0;
}else{
werror("Target already exists as a directory.\n");
exit(1);
}
case -3:
case 0..:
if(!to_fs->rm(to))
{
werror("Failed to remove file/previos symlink.\n");
exit(1);
}
}
if(string as=from_fs->readlink(from))
{
if(to_fs->symlink(as,to))
{
update_status(0,0," -> "+as+"\n");
return;
}else{
output("\nFailed to create symlink (%s -> %s)\n",to,as);
exit(1);
}
}else{
output("\nFailed to read symlink.\n");
exit(1);
}
}
case -2:
/* Directory */
if(!recurse) return;
update_status(0,0,":");
// status_line(" %s : ",base);
debug(2,"\nDirectory found, descending.\n");
if(!name_combined)
{
debug(2,"Combinining target %s and %s\n",to,base);
to=to_fs->combine_path(to,base);
name_combined=1;
update_file(from_fs,from,to_fs,to,base);
}
switch(to_fs->file_size(to))
{
default:
werror("Directory %s already exists as a file.\n",to);
exit(1);
case -1:
debug(2,"\nCreating target directory %s.\n",to);
while(1)
{
to_fs->mkdir(to);
switch(to_fs->file_size(to))
{
default:
werror("Directory %s already exists as a file.\n",to);
exit(1);
case -2:
debug(2,"\nMkdir succeded.\n");
break;
case -1:
werror("Mkdir %s failed.\n",to);
continue;
}
break;
}
break;
case -2:
debug(2,"\nTarget directory already exists.\n");
break;
}
string *tmp=from_fs->get_dir(from);
if(!tmp)
{
update_status(0,0,": permission denied\n");
return;
}
sort(tmp);
update_status(0,0,sprintf(": %d files/directories\n",sizeof(tmp)));
// status_line(" %s : %d files/directories\n",base,sizeof(tmp));
foreach(tmp, string file)
{
debug(2,"Recusively copying %s\n",file);
low_copy(from_fs,from_fs->combine_path(from,file),
to_fs,to_fs->combine_path(to,file),
1);
}
return;
default:
}
while(1)
{
start=to_fs->file_size(to);
switch(start)
{
case -2:
if(!name_combined)
{
to=to_fs->combine_path(to,base);
name_combined=0;
continue;
}
werror("Target already exists as a directory.\n");
exit(1);
case -1: /* FIXME, a nonexistant file is not the same as a zero-length file! */
/* Consider setting restart_from_scratch here */
break;
case -4:
case -3:
werror("Target already exists as special (device) file.\n");
exit(1);
}
if(start < old_start)
{
status("\nFILE SHRUNK from %d to %d bytes\n",old_start,start);
}
debug(1,"\nSTART = %d, SIZE=%d\n",start,size);
if(start>=size)
{
int wrong;
if(start > size && size>=0)
{
wrong++;
}else{
if(verify)
{
int fck, tck;
if(fck=from_fs->cksum(from))
tck=to_fs->cksum(to);
if(fck && tck)
{
if(tck != fck)
wrong++;
}else{
switch(to_fs->verify(to))
{
case 0: wrong++; break;
case 1:
debug(1,"Single-sided verify success!\n");
break;
case -1:
if(from_fs->file_size(from) != to_fs->file_size(to))
wrong++;
}
}
// werror("Verify: %d?=%d %d?=%d\n",ck1,ck2,len1,len2);
}
}
if(wrong)
{
update_status(0,0,"Verify failed\n");
// status_line(" Verify failed!\n");
// exit(0);
// to_fs->rm(to);
restart_from_scratch=1;
start=0;
}else{
break;
}
}
// Move the tests to the filesystems themselves
if(optimize &&
from_fs->query_fs_type()=="ftp" &&
to_fs->query_fs_type()=="ftp")
{
if(from_fs->do_optimize(from,to_fs,to))
continue;
}
int tmptmp=0;
Stdio.File f;
do
{
START_ALARM();
f=from_fs->open(from,"r",max(start,0));
END_ALARM();
tmptmp=1;
}while(0);
if(!tmptmp)
{
update_status(0,0,sprintf("open timeout"));
continue;
}
if(!f)
{
werror("Open failed (file does not exist?)\n");
break;
}
while(f->tell() < start)
{
update_status(0,0,sprintf(">> %d bytes",start-f->tell()));
debug(1,"Forwarding manually %d bytes\n",start-f->tell());
START_ALARM();
f->read(min(start - f->tell(),BLOCK_SIZE),1);
END_ALARM();
}
if(f->tell() < start)
{
update_status(0,0,sprintf(">> timeout",start-f->tell()));
continue;
}
if(start == old_start)
{
switch(retries++)
{
case 10..: sleep(300); break;
case 2..: sleep(retries*20); break;
case 0..: break;
}
}else{
retries=0;
}
old_start=start;
update_status(Status(bps, start, size),eta(),0);
// status_line(" %s %d/%d kb %1.2f kb/s%s",base,start/1024,size/1024,bps/1024.0,eta());
Stdio.File data;
if(restart_from_scratch)
data=to_fs->open(to,"wct",0);
else
data=to_fs->open(to,"caw");
/* File now exists (and if just created, at position zero) */
if(start<0) start=0;
while(1)
{
string s;
int written;
START_ALARM();
s=f->read(BLOCK_SIZE,1);
END_ALARM();
if(!s || !strlen(s))
{
debug(4,"low_copy: read failed %O\n",s);
break;
}
debug(4,"low_copy: Got %d bytes\n",strlen(s));
add_sent(s,refresh);
start+=strlen(s);
START_ALARM();
written=data->write(s);
END_ALARM();
if(written!=strlen(s))
{
debug(1,"\nWrite failed (%d != %d, %s), reconnecting.\n",written,strlen(s),(string)strerror(data->errno()));
break;
}
update_status(Status(bps, start, size),eta(),0);
// status_line(" %s %d/%d kb %1.2f kb/s%s",base,start/1024,size/1024,bps/1024.0,eta());
}
destruct(data);
destruct(f);
}
update_status(Status(file_bps->bps(), start, size),eta(),"Done\n");
// status_line(" %s %d/%d kb (%1.2f kb/s) Done\n",base,start/1024,size/1024,file_bps->bps()/1024.0);
total_bytes+=size;
}
}
ToDo *preprocess(string from, int no_summaries)
{
array(ToDo) ret=({});
VFileSystem from_fs;
status_line("Expanding %s",from);
[from_fs, from] = fsopen(from);
string *files;
files=({from});
if(search(from,"?")!=-1 || search(from,"*")!=-1)
files=from_fs->expand(files);
files=Array.filter(files,lambda(string s) {
foreach(except, Regexp r)
if(r->match(s))
return 0;
return 1;
});
array(Summary) summaries;
if(no_summaries)
summaries=allocate(sizeof(files));
else
summaries=from_fs->map(files,"recursive_sizeof");
for(int e=0;eread(BLOCK_SIZE,1))
{
if(!strlen(s)) break;
int written=write(s);
if(written!=strlen(s)) break;
}
}
};
array parse_options(array(string) argv,
void|array xopts)
{
/* Debugging options */
array ret=({});
#if constant(signal)
signal(signum("SIGTERM"),lambda()
{
throw(({"timeout",backtrace()}));
});
signal(signum("SIGQUIT"),lambda()
{
master()->handle_error( ({"\nSIGTERM recived, printing backtrace and continuing.\n",backtrace() }) );
});
#endif
if(!xopts) xopts=({});
foreach(Getopt.find_all_options(argv,aggregate(
({"speed",Getopt.HAS_ARG,({"-s","--speed"}),"PCP_SPEED"}),
({"except",Getopt.HAS_ARG,({"-v","--except"}),"PCP_AVOID"}),
({"block size",Getopt.HAS_ARG,({"-b","--block-size"}),"PCP_BLOCK_SIZE"}),
({"recur",Getopt.NO_ARG,({"-r","--no-recurse"}),"PCP_NO_RECURSE"}),
({"debug",Getopt.MAY_HAVE_ARG,({"-d","--debug"}),"PCP_DEBUG"}),
({"verify",Getopt.NO_ARG,({"-V","--verify"}),"PCP_VERIFY"}),
({"optimize",Getopt.NO_ARG,({"--no-optimize","-O"}),"PCP_NO_OPTIMIZE"}),
({"glob",Getopt.NO_ARG,({"--no-glob","-g"}),"PCP_NO_GLOB"}),
({"compression",Getopt.HAS_ARG,({"--compression","-C"}),"PCP_COMPRESSION"}),
({"interactive",Getopt.NO_ARG,({"--interactive","-i"}),"PCP_INTERACTIVE"}),
({"ssh-proxy",Getopt.HAS_ARG,({"--ssh-proxy"}),"PCP_PROXY"}),
({"user",Getopt.HAS_ARG,({"-l","--user"}),"PCP_USER"}),
({"passwd",Getopt.HAS_ARG,({"-p","--passwd"}),"PCP_PASSWD"}),
)+xopts),array opt)
{
switch(opt[0])
{
default: ret+=({opt}); break;
case "user":
default_user=opt[1];;
args+=({"-l"+opt[1]});
break;
case "passwd":
default_passwd=opt[1];;
args+=({"-p"+opt[1]});
break;
case "compression":
args+=({"-C"+opt[1]});
compression=(int)opt[1];
break;
case "speed":
args+=({"-s"+opt[1]});
read_speed=1024.0 * ( (float)opt[1] + 0.00025 );
break;
case "except":
args+=({"-v"+opt[1]});
except+=({Regexp(opt[1])});
break;
case "recur":
args+=({"-r"});
recurse=!recurse;
break;
case "debug":
if(!opt[1] || opt[1]=="")
debug_level++;
else
debug_level=(int)opt[1];
args+=({"-d"+debug_level});
break;
case "verify":
args+=({"-V"});
verify++;
break;
case "optimize":
args+=({"-O"});
optimize=0;
break;
case "glob":
args+=({"-g"});
no_expand=1;
break;
case "block size":
BLOCK_SIZE=(int)opt[1];
if(BLOCK_SIZE<1)
{
werror("Block size must be larger than zero.\n");
exit(1);
}
args+=({"-b"+BLOCK_SIZE});
break;
case "ssh-proxy":
ssh_proxies+=opt[1]/":";
break;
case "interactive":
interactive++;
args+=({"-i"});
break;
}
}
return ret;
}
array(string) filter_args(array(string) argv)
{
args+=({"--"});
return Getopt.get_args(argv);
}
constant globalopts=#"\
-s --speed= : Limit transfer speed
-v --except= : Don't fetch files/dirs matching this
-b --block-size= : Read/write this many bytes at a time
-C --compression=: Gz compression
-r --no-recurse : don't recurse into subdirs
-d --debug= : show debug messages
-V --verify : verify transfers if possible
-O --no-optimize : don't optimize host-to-host transfers
-g --no-glob : don't do internal globbing
-i --interactive : tries to use interactive tcp priority
--ssh-proxy= : go through this host
";
class LineFile
{
private string b="";
private int bpos=0;
Stdio.File f;
void create(string path, object copier)
{
if(path=="-")
{
f=Stdio.stdin;
}else{
VFileSystem fs;
[fs, path]=copier->fsopen(path);
f=fs->open(path,"r",0);
}
}
inline private static nomask int get_data()
{
string s;
b = b[bpos .. ];
bpos=0;
s = f->read(BLOCK_SIZE,1);
if(!s || !strlen(s))
return 0;
b += s;
return 1;
}
inline private static nomask string extract(int bytes, int|void skip)
{
string s;
s=b[bpos..bpos+bytes-1];
bpos += bytes+skip;
return s;
}
string gets()
{
int p,tmp=0;
while((p=search(b, "\n", bpos+tmp)) == -1)
{
tmp=strlen(b)-bpos;
if(!get_data()) return 0;
}
return extract(p-bpos, 1);
}
}
int main(int argc, string *argv)
{
string path,from_file;
int list,no_summary;
CopyThread copier=CopyThread();
foreach(parse_options(argv,aggregate(
({"list",Getopt.NO_ARG,({"--list"})}),
({"version",Getopt.NO_ARG,({"--version"})}),
({"quiet",Getopt.NO_ARG,({"-q","--quiet"}),"PCP_QUIET"}),
({"proxy",Getopt.MAY_HAVE_ARG,({"--pcp-server-mode"}),"PCP_PROXY"}),
({"from-file",Getopt.HAS_ARG,({"-T","--from-file"})}),
({"no-summary",Getopt.NO_ARG,({"--no-summary"})}),
)),mixed opt)
{
switch(opt[0])
{
case "no-summary":
no_summary++;
break;
case "from-file":
no_summary++;
from_file=opt[1];
break;
case "quiet":
args+=({"-q"});
quiet=!quiet;
break;
case "list":
args+=({"--list"});
list=1;
break;
case "version":
write("%s\n", ID);
exit(0);
case "proxy":
{
int kb_cache=(int)opt[1];
object codec=Codec();
quiet=1;
debug(1,"Server started\n");
write(PROTOCOL_VERSION);
object fs=copier->ProxyFS(kb_cache);
while(string s=Stdio.stdin->read(4))
{
if(strlen(s)!=4) exit(0);
int len;
if(!sscanf(s,"%4c",len)) exit(1);
mixed data=Stdio.stdin->read(len);
data=decode_value(data,codec);
debug(2,"Serving %s\n",data[0]);
data=encode_value(`->(fs,data[0])(@data[1..]),codec);
write("%4c%s",strlen(data),data);
}
exit(1);
}
}
}
argv=filter_args(argv);
if(sizeof(argv)<3- ((list||from_file)&&1) )
{
werror(
#"PCP version: "+ID+#"
Copyright Fredrik Hübinette.
This program may be freely copied under the terms of the
GNU General Public Licence (GPL).
Usage: pcp [options] from [from from from] to
From and to are can be any of:
ftp://[user[:pass]@]host/path : ftp transfer
http://host[:port]/path : http transfer
ssh://[user@]host/path or host:path : ssh transfer
file:/path or path : local files
ssh-proxy://[user@]host/URL : re-routed transfers
Options:
-q --quiet : don't show progress
--list : list files
--version : print the pcp version
-T --from-file : read files to copy from 'file'
--no-summary : don't compute total download time
"+globalopts);
exit(0);
}
// trace(1);
debug(1,"DEBUG=%d VERSION=%s\n",debug_level,ID);
VFileSystem from_fs,to_fs;
status_line("Expanding");
array data=`+( ({}),@Array.map(argv[1..sizeof(argv)-2+list],copier->preprocess,no_summary));
erase_status_line();
if(!no_summary)
{
total_size=`+(0.0, @ (array(float)) data->summary->bytes);
status("To copy: %d Kb, %d files, %d dirs.\n",
(int)(total_size/1024),
`+(0,@data->summary->files),
`+(0,@data->summary->dirs));
}
if(list)
{
foreach(data, ToDo x)
{
foreach(except, Regexp r)
{
if(r->match(x->path))
{
x=0;
break;
}
}
if(!x) continue;
write("%8d %s\n",x->summary->bytes,x->from_fs->mkurl(x->path));
}
exit(0);
}
string to=argv[-1];
[to_fs, to]=copier->fsopen(to);
int isdir=to_fs->file_size(to)==-2;
debug(2,"Destination isdir: %d\n",isdir);
if(sizeof(argv)!=3 && !isdir)
{
werror("Destination is not a directory.\n");
exit(1);
}
#if 0
if(optimize && to_fs->query_fs_type()=="ssh-proxy")
{
argv[-1]=to;
if(!to_fs->do_optimize(to_fs, args+argv[1..]))
return 0;
}
#endif
foreach(data, ToDo x)
{
int save_total_bytes=total_bytes;
// low_copy(x->from_fs,x->path,to_fs,
// isdir?to_fs->combine_path(to,basename(x->path)):to);
copier->low_copy(x->from_fs,x->path,to_fs,to,0);
if(x->summary)
total_bytes=save_total_bytes+x->summary->bytes;
}
if(from_file)
{
LineFile f=LineFile(from_file, copier);
total_size=-1.0;
while(string url=f->gets())
{
if(!strlen(url)) continue;
if(url[0]=='#') continue;
foreach(copier->preprocess(url), ToDo x)
{
copier->low_copy(x->from_fs,x->path,to_fs,to,0);
}
}
}
status("Done.\n");
return 0;
}