xop/ 40777 0 0 0 10525315235 10037 5 ustar user group xop/.includepath 100666 0 0 171 10443063504 12412 0 ustar user group
xop/.project 100666 0 0 571 10440651556 11573 0 ustar user group
xoporg.epic.perleditor.perlbuilderorg.epic.perleditor.perlnature
xop/test.pl 100666 0 0 270 10525617316 11434 0 ustar user group use xcopy;
my $cp = xcopy->new(2, 10, 'c:/xcopy.log', 'c:/error.log' );
$cp->addTask( 'c:/src', 'c:/dest', '' );
$cp->addTask('c:/Cleaner3','c:/dest/cleaner3','');
$cp->run();
xop/tsk/ 40777 0 0 0 10525467057 10652 5 ustar user group xop/tsk/bucket.pm 100666 0 0 1660 10525602102 12543 0 ustar user group package tsk::bucket;
use strict;
sub new
{
my ($class) = @_;
my @list_src = ();
my @list_dest= ();
my $items = 0;
my $self =
{
list_src => \@list_src,
list_dest =>\@list_dest,
items=> \$items
};
bless $self, $class;
return $self;
}
sub push
{
my ($this, $srcFile, $destFile) = @_;
push(@{$this->{'list_src'}},$srcFile);
push(@{$this->{'list_dest'}},$destFile);
${$this->{'items'}}++;
}
sub has_items
{
my ($this) = @_;
return (${$this->{'items'}} > 0);
}
sub size
{
my ($this)= @_;
return ${$this->{'items'}};
}
sub pop
{
my ($this) = @_;
my $src = pop (@{$this->{'list_src'}});
my $dest = pop (@{$this->{'list_dest'}});
${$this->{'items'}}--;
return ($src, $dest);
}
sub clean
{
my ($this) = @_;
undef(@{$this->{'list_src'}});
undef(@{$this->{'list_dest'}});
${$this->{'items'}} = 0;
}
1;
xop/tsk/ErrorHandler.pm 100666 0 0 3462 10440651622 13666 0 ustar user group #-------------------------------------------------------------------------------------
#simple error handler logger module
# Author: Sanjit Rath March 2006
# bugs mailto: sanjit.rath@oracle.com
#-------------------------------------------------------------------------------------
package ErrorHandler;
use strict;
sub new
{
my ( $class, $format, $method, $logFile ) = @_;
if ( $] < 5.008 )
{
die("Script requires perl version 5.8 or higher");
}
#private subroutine to return format strings
sub __formatter
{
my ($formatString) = @_;
my ( $file, $package, $line ) = ( __FILE__, __PACKAGE__, __LINE__ );
$formatString =~ s/--file/$file/;
$formatString =~ s/--package/$package/;
$formatString =~ s/--line/$line/;
return $formatString;
}
my $self = {
__format => __formatter($format)
, #supported format values --package --file --line
__method => defined($method) ? $method : 'normal',
__logFile => ( defined($method) && $method eq 'log' )
? ( defined($logFile) ? $logFile : die("log file not defined\n") )
: 0
};
}
sub error
{
my ( $this, $msg ) = @_;
my $errorMessage = $this->{__format} . $msg;
if ( $this->{__method} eq 'normal' )
{
print STDERR $errorMessage;
} elsif ( $this->{__method} eq 'log' )
{
#log yet to be implemented
print STDERR $errorMessage;
}
#other logging options are yet to be coded
}
#static method to check arguments
#usage ErrorHandler::checkArgs(\@_,number_of_arguments);
sub checkArgs
{
my ( $listRef, $args ) = @_;
my @list = @$listRef;
my ( $file, $package, $line ) = ( __FILE__, __PACKAGE__, __LINE__ );
if ( $#list >= ( $args - 1 ) )
{
return 1;
} else
{
die("Error: WrongNumber of Arguments:\nFile: $file\nPackage: $package\nLine: $line");
}
}
1; #return true
xop/tsk/logger.pm 100666 0 0 4254 10525611501 12552 0 ustar user group package tsk::logger;
use strict;
use IO::File;
use threads;
use threads::shared;
sub new ($$)
{
my ($class, $logFile, $errorFile ) = @_;
my @log_q : shared;
my @err_q : shared;
my $stopLogger : shared = 0;
my $self = {
_logFile => $logFile,,
_errFile => $errorFile,
_logQueue => \@log_q,
_errQueue => \@err_q,
_stop => \$stopLogger
};
return bless $self, $class;
}
sub log
{
my ($this, $msg) = @_;
my $q = $this->{'_logQueue'};
lock(@$q);
push (@$q, $msg) and cond_signal(@$q);
}
sub error
{
my ($this, $msg) = @_;
my $q = $this->{'_errQueue'};
lock(@$q);
push (@$q, $msg) and cond_signal(@$q);
}
sub start()
{
my ($this) = @_;
sub __logger
{
my ($this) = @_;
my $logFile = $this->{'_logFile'};
open(LFILE, '>'.$logFile) or die("Error: Failed to open log file: $logFile");
my $queue = $this->{'_logQueue'};
my $sref = $this->{'_stop'};
while ( $$sref == 0 || @$queue > 0 )
{
lock(@$queue);
cond_wait @$queue
until(@$queue || $$sref);
#print $file shift @$queue, "\n";
my $msg = shift @$queue;
print LFILE $msg, "\n";
#print $msg, "\n";
}
close(LFILE);
}
sub __error
{
my ($this) = @_;
my $errFile = $this->{'_errFile'};
open(EFILE, '>'.$errFile) or die("Error: Failed to open log file: $errFile");
my $queue = $this->{'_errQueue'};
my $sref = $this->{'_stop'};
while ( $$sref == 0 || @$queue > 0 )
{
lock(@$queue);
cond_wait @$queue
until(@$queue || $$sref);
my $msg = shift @$queue;
print EFILE $msg, "\n";
#print $msg, "\n";
}
close(EFILE);
}
my $tid = threads->create( \&__logger, $this );
unless ( defined($tid) )
{
die("Error: Failed to start logger theread");
}
$tid = threads->create( \&__error, $this );
unless ( defined($tid) )
{
die("Error: Failed to start error logger theread");
}
return 1;
}
sub stop()
{
my ($this) = @_ ;
my $qlog = $this->{'_logQueue'};
my $qerr = $this->{'_errQueue'};
lock(@$qlog);
lock(@$qerr);
${$this->{'_stop'}} = 1;
cond_broadcast(@$qlog);
cond_broadcast(@$qerr);
}
1;
xop/tsk/task.pm 100666 0 0 14561 10525602661 12266 0 ustar user group #-------------------------------------------------------------------------------
# Generic task storage container
# Coded by Sanjit Rath
# For bugs and suggestions mailto:sanjit.rath@gmail.com
#-------------------------------------------------------------------------------
package tsk::task;
use strict;
use warnings;
use File::stat;
use Time::localtime;
#create new task object
sub new
{
my ( $class, $src_dir, $dest_dir, $flags ) = @_;
if ( substr( $src_dir, -1 ) eq '\\' )
{
chop($src_dir);
}
if ( substr( $dest_dir, -1 ) eq '\\' )
{
chop($dest_dir);
}
#set the flags
# -d:d/m/y copies files with modification time after the said date
# -s copies directories and subdirectoriesand files
# -e copies directories and subdirectories including empty ones
# -c continue copying even if error occures, default behaviour is to stop
# the script execution
# -i copyies files from directoriy tree to a destination directory. Here
# destination directory structure is not created
# -h copies system and hidden files
# -r overwrites read only files
# -t creates directory structure only
# -u copies files only if destination file exists
# -rx: simple wild card expression, possible values *, *.*, **
# -prx: complex perl regular expression
my $self = {
_source_dir => $src_dir,
_destitnation_dir => $dest_dir,
_source_dir_length => length($src_dir),
_date => "",
_dir_sub_dir => 0,
_dir_sub_dir_empty => 0,
_continue_on_error => 0,
_copy_files_to_dir => 0,
_copy_system_hidden_files => 0,
_overwrite_readonly_files => 0,
_create_directory_structure_only => 0,
_copy_only_if_destination => 0,
_regEx => "",
#internal flags
__if_regEx => -1 #false if regular expression is used
};
bless( $self, $class );
#set the flags
my @flags = split( /\s/, $flags );
foreach my $flg (@flags)
{
if ( $flg =~ /-d:(.+)/ )
{
$self->{_date} = $1;
}
elsif ( $flg eq "-s" )
{
$self->{_dir_sub_dir} = 1;
}
elsif ( $flg eq "-e" )
{
$self->{_dir_sub_dir_empty} = 1;
}
elsif ( $flg eq "-c" )
{
$self->{_continue_on_error} = 1;
}
elsif ( $flg eq "-i" )
{
$self->{_copy_files_to_dir} = 1;
}
elsif ( $flg eq "-h" )
{
$self->{_copy_system_hidden_files} = 1;
}
elsif ( $flg eq "-r" )
{
$self->{_overwrite_readonly_files} = 1;
}
elsif ( $flg eq "-t" )
{
$self->{_create_directory_structure_only} = 1;
}
elsif ( $flg eq "-u" )
{
$self->{_copy_only_if_destination} = 1;
}
elsif( $flg =~ /-rx:(.+)/)
{
$self->{_regEx} = $self->__compileRegEx($1);
}
elsif( $flg =~ /-prx:(.+)/)
{
$self->{_regEx} = $1;
}
else
{
#condition for unknown flag
die("Error: Unknown flag $flg");
}
}
#compile the regular expression
return $self;
}
#subroutine to return source dir
#arguments:
#returns: source directory
sub __taskSourceDir
{
my ($this) = @_;
return $this->{_source_dir};
}
#subroutine to return destination dir
#arguments:
#returns: destination directory
sub __taskDestinationDir
{
my ($this) = @_;
return $this->{_destitnation_dir};
}
#subroutine to perform task as per the
#arguments:
# source_file, destination file
# return value 1 if ok else 0
# TODO check task as per the flags
sub __taskAsPerFlags
{
my ( $this, $srcFile, $destFile ) = @_;
# if ( $this->{_copy_system_hidden_files} == 0 )
# {
# if ( -h $srcFile )//WRONG: -h is not a valid -X operator
# {
# return 0;
# }
# }
if ( $this->{_overwrite_readonly_files} == 0 )
{
if ( -f $destFile )
{
if ( -r $destFile )
{
return 0;
}
}
}
if ( $this->{_copy_only_if_destination} == 1 )
{
unless ( -f $destFile )
{
return 0;
}
}
unless( $this->{_date} eq "")
{
return $this->__checkFileDate($srcFile);
}
unless($this->{_regEx} eq "")
{
return $this->__matchFile($srcFile);
}
return 1;
}
# checks if the file modification date is
# more or less than a given date
# the given date format should be yyyy/mm/dd
sub __checkFileDate
{
my ($this, $file) = @_;
my $date = $this->{'_date'};
#get file date
my $fmDate = (stat($file))[9];
my ($day, $month, $year) = (localtime($fmDate))[3,4,5];
$year = 1900 + $year;
my $fileModDate = '$year/$month/$day';
if($fileModDate le $date)
{
return 1;
}
else
{
return 0;
}
}
#takes regex pattern
# gets file from the srcFilePath
# trys to match the file with regex pattern
sub __matchFile
{
my ( $this, $srcFilePath ) = @_;
$srcFilePath =~ s/\\/\//g;
my $sbPath = $srcFilePath;
my @comps = split( /\//, $sbPath );
my $lastComponent = $comps[$#comps];
if ( $lastComponent =~ /$this->{_regEx}/ )
{
return 1;
}
else
{
return 0;
}
}
#matches a directory with a regex pattern
sub __matchDir
{
my ($this, $directory) = @_;
$directory =~ s/\\/\//g;
if($directory =~ /$this->{_regEx}/)
{
return 1;
}
else
{
return 0;
}
}
# takes file regex and
# returns perl regex
sub __compileRegEx
{
#TODO: check/test compile regEx for wild card expressions
my ($this, $strExpr) = @_;
unless(defined($strExpr))
{
return;
}
if($strExpr eq "")
{
return;
}
#strategy for compiling regular expressions
#replace . with \.
#replace * with (.+)?
$strExpr =~ s/\./\\\./g;
$strExpr =~ s/\*/\(\.\+\)\?/g;
$this->{_regEx} = $strExpr;
}
#public method
# takes source file
# returns file path if the flag parameters are met
# else returns 0
sub destPath
{
my ($this, $srcFilePath) = @_;
#strip off source dir length
my $fileSubPath = substr($srcFilePath, $this->{_source_dir_length});
my $destFilePath = $this->{_destitnation_dir}.$fileSubPath;
#currently directories are passed unconditionally
if(-d $srcFilePath)
{
return $destFilePath;
}
#TODO: check file attributes as per the flags
#check for validity of operation
if($this->__taskAsPerFlags($srcFilePath, $destFilePath))
{
return $destFilePath;
}
else
{
return "";
}
}
1;
xop/xcopy.pm 100666 0 0 31706 10525620440 11660 0 ustar user group #-------------------------------------------------------------------------------
# xCopy package
# coded by Sanjit Rath
# For bugs and suggestions mail to sanjit.rath at gmail.com
# April - September 2006
#-------------------------------------------------------------------------------
package xcopy;
require v5.6.0;
use warnings;
use strict;
use Config qw(%Config);
#version of the script
my $VERSION = 0.1;
#check if the thread support is available
$Config{useithreads} or die "Recompile Perl with threads to run this program.";
use IO::File;
use File::Copy;
use threads;
use threads::shared;
sub BEGIN
{
push( @INC, './tsk' );
}
#use xop package modules
use tsk::logger;
use tsk::task;
use tsk::bucket;
sub new($$@)
{
my ( $class, $maxThreads, $tasksPerThread, $logFile, $errorLog ) = @_;
my @taskList = ();
my $logger = new tsk::logger($logFile, $errorLog);
my $workerThreadCount : shared = 0;
my $self = {
_maxThreads => $maxThreads,
_tasksPerThread => $tasksPerThread,
_logger => $logger,
_taskListRef => \@taskList,
_bServerStarted => 0, #server is yet to be started
_expandThreadId => 0,
_workerThreads => \$workerThreadCount
};
bless $self, $class;
return $self;
}
sub addTask
{
my ( $this, $strSrcDir, $strDestDir, $strFlags ) = @_;
unless ( defined($strSrcDir) || defined($strDestDir) || defined($strFlags) )
{
die("xcopy Error: undefined add task parameters @_ ");
return;
}
my $tsk = new tsk::task( $strSrcDir, $strDestDir, $strFlags );
push( @{ $this->{_taskListRef} }, $tsk );
}
sub __runSerialCopy
{
my ($this) = @_;
my @tasks = @{ $this->{'_taskListRef'} };
sub __fileBrowser
{
my ( $dir, $tsk ) = @_;
#try opening the directory
unless ( opendir( DIRF, $dir ) )
{
$this->{_logger}->error("Can't open $dir\n");
return;
}
my ( $dir_item, @dirs );
foreach $dir_item ( sort readdir(DIRF) )
{
if ( $dir_item eq "." || $dir_item eq ".." )
{
next;
}
my $complete_path = "$dir/$dir_item";
if ( -d $complete_path )
{
push( @dirs, $complete_path );
#it is a directory
my $dest_path = $tsk->destPath($complete_path);
unless ($dest_path eq "" )
{
if ( $this->__makeDir($dest_path) )
{
$this->{'_logger'}->log("mkdir: $dest_path");
}
else
{
$this->{'_logger'}->error("mkdir: $dest_path");
next;
}
}
}
else
{
#it is a file
my $dest_path = $tsk->destPath($complete_path);
unless ($dest_path eq "" )
{
#required only if file needs to be copied
unless ( $this->__makeDirForFile($dest_path) )
{
$this->error("mkdir: $dest_path");
next;
}
if ( copy( $complete_path, $dest_path ) )
{
$this->{'_logger'}->log("copy: $complete_path -> $dest_path");
}
else
{
$this->{'_logger'}->error("Error: copy $complete_path -> $dest_path");
}
}
}
}
closedir(DIRF);
$dir_item = "";
foreach $dir_item (@dirs)
{
__fileBrowser( $dir_item, $tsk );
}
}
#for each tasks in task list run file browser
foreach my $tsk (@tasks)
{
__fileBrowser( $tsk->__taskSourceDir(), $tsk );
}
}
sub __runParallelCopy
{
my ($this) = @_;
#inputs to the thread procedure
# array of reference
# 1 $bucket
sub __threadProc
{
my ($bucket, $this) = @_;
while($bucket->has_items)
{
my ( $src, $dest ) = $bucket->pop;
#required if only files needs to be copied
unless ( $this->__makeDirForFile($dest) )
{
$this->{'_logger'}->error("Error: mkdir $dest");
next;
}
if ( copy( $src, $dest ) )
{
$this->{'_logger'}->log("copy: $src -> $dest");
}
else
{
$this->{'_logger'}->error("Error: copy $src -> $dest");
}
}
#decrement worker thread count
$this->__removeWorkerThread();
}
my @tasks = @{ $this->{'_taskListRef'} };
my $bucket = tsk::bucket->new();
sub __fileBrowserP
{
my ( $dir, $tsk ) = @_;
#try opening the directory
unless ( opendir( DIRF, $dir ) )
{
$this->{'_logger'}->error("Can't open $dir");
return;
}
my ( $dir_item, @dirs );
foreach $dir_item ( sort readdir(DIRF) )
{
if ( $dir_item eq "." || $dir_item eq ".." )
{
next;
}
my $complete_path = "$dir/$dir_item";
if ( -d $complete_path )
{
push( @dirs, $complete_path );
#it is a directory dont schedule in the job queue
my $dest_path = $tsk->destPath($complete_path);
unless ($dest_path eq "" )
{
if ( $this->__makeDir($dest_path) )
{
$this->{'_logger'}->log("mkdir: $dest_path");
}
else
{
$this->{'_logger'}->error("Error: mkdir $dest_path");
next;
}
}
}
else
{
#it is a file
my $dest_path = $tsk->destPath($complete_path);
unless ( $dest_path eq "")
{
$bucket->push( $complete_path, $dest_path );
if ( $bucket->size() == $this->{'_tasksPerThread'})
{
$this->__waitForWorkerThreadCount ($this->{'_maxThreads'});
if(threads->create( \&__threadProc, $bucket, $this ))
{
$this->__addWorkerThread();
}
else
{
die("Fatal: Couldn't create thred for parallel run");
}
#create a new bucket object for next thread
print "creating bucket..\n";
$bucket = tsk::bucket->new();
}
}
}
}
closedir(DIRF);
$dir_item = "";
foreach $dir_item (@dirs)
{
__fileBrowserP( $dir_item, $tsk );
}
}
#for each tasks in task list run file browser
foreach my $tsk (@tasks)
{
__fileBrowserP( $tsk->__taskSourceDir(), $tsk);
}
#remaining copy jobs where jobs <<< bucket size
$this->__waitForWorkerThreadCount ($this->{'_maxThreads'});
if(threads->create( \&__threadProc, $bucket, $this ))
{
$this->__addWorkerThread();
}
else
{
die("Fatal: Couldn't create thread for parallel run");
}
}
# sub to wait for threads to finish execution
# input nothing, returns after threads have finished
sub __waitForThreads
{
my ($this) = @_;
$this->{'_logger'}->stop();
#wait for other copy task threads to end
foreach (threads->list())
{
$_->join();
}
}
sub __workerThreadCount()
{
my ($this) = @_;
my $c = $this->{'_workerThreads'};
lock($$c);
return $$c;
}
sub __addWorkerThread()
{
my ($this) = @_;
my $c = $this->{'_workerThreads'};
lock($$c);
$$c ++ and cond_signal($$c);
}
sub __removeWorkerThread()
{
my ($this) = @_;
my $c = $this->{'_workerThreads'};
lock($$c);
$$c -- and cond_signal($$c);
}
sub __waitForWorkerThreadCount($)
{
my ($this, $waitCount) = @_;
my $c = $this->{'_workerThreads'};
print "waiting for ..",$$c,"and",$waitCount, "\n";
lock($$c);
cond_wait($$c) until ($$c <= $waitCount);
print "wait done $$c\n";
}
#sub to make directory tree
#inputs directory
#returns true if directory is created false otherwise
#TODO __makeDir takes directory and creates the directory structure
sub __makeDir
{
my ( $this, $dir ) = @_;
$dir =~ s/\\/\//g;
#if the directory exist return
if ( -d $dir )
{
return 1;
}
my @comps = split( /\//, $dir );
my $path = "";
for ( my $i = 0 ; $i <= $#comps ; $i++ )
{
if ( $path eq "" )
{
$path = $comps[$i];
}
else
{
$path = "$path/$comps[$i]";
}
#create directory component if it doesnt exist
unless ( -d $path )
{
unless ( mkdir($path, 0777) )
{
$this->{'_logger'}->error("Error: failed to make directory $path\n");
return 0;
}
}
}
return ( -d $dir );
}
#sub to make directory tree out of FilePath
#input: File Path
#returns true if directory is created false otherwise
sub __makeDirForFile
{
my ( $this, $file ) = @_;
$file =~ s/\\/\//g;
my @comps = split( /\//, $file );
pop(@comps);
my $dir = join( '/', @comps );
#if the directory exist return
if ( -d $dir )
{
return 1;
}
my $path = "";
for ( my $i = 0 ; $i <= $#comps ; $i++ )
{
if ( $path eq "" )
{
$path = $comps[$i];
}
else
{
$path = "$path/$comps[$i]";
}
#create directory component if it doesnt exist
unless ( -d $path )
{
unless ( mkdir($path) )
{
$this->{'_logger'}->error("Error: failed to make directory $path\n");
return 0;
}
}
}
return ( -d $dir );
}
sub run()
{
my ( $this ) = @_;
$this->{'_logger'}->start();
if ( $this->{'_maxThreads'} == 0 )
{
print "running serial copy\n";
$this->__runSerialCopy();
}
else
{
print "running parallel copy\n";
$this->__runParallelCopy();
print "out of copy";
}
$this->__waitForThreads();
}
1;
__END__
=head1 NAME
Module xop::xcopy
This is a generic XCOPY implementation with platform independant standard
features, in perl, with many improvements. It uses task concept, where the task is
expanded to subtasks and each subtask is grouped as buckets, each buckets are executed
parallel or in serial as per the arguments to task.
It is designed for very large copy of files typically used in SCM (Souce Code Management)
enviroments like ClearCase, CVS and Oracle ADE, where time required for copy and
accuracy is most critical.
Features as of version 0.1
i. Stable task execution
ii. Serial & Parallel XCOPY
iii. Log file generation for each task
=head1 DESCRIPTION
xop::xcopy
Concepts:
xopy works by thread and bucket concept, buckets represent a group of tasks, these
are executed by a single thread. This has been designed keeping in mind the following
factors
i. Prevent thread rush for acessing shared task:
there can be two approches for the problem
a) constant number of running thread, and variable number of file copy sub-tasks
if time required for the sub-tasks are small there is possibility of thread rush
where most CPU time is consumed by the running threads
b) constant number of running threads and constant number of file copy sub-tasks
Here also if time required for copy task is small, there is potential thread
rush problem, secondly constant running threads consume CPU time
c) Create a thread for a constant number of file copy sub-tasks (bucket) up a
constant number of threads (number of threads)
This approach solves thread rush as well as most of the CPU time is given
for file copy sub-tasks
Requirement:
perl version 5.8 and higher although it will work with 5.6 and higher
This is because of the improved thread in perl in higher versions.
Usage:
use xop::xcopy;
my $cp = xop::xcopy->new(,,, );
: if 0,1 initiates serial copy, no threads are created
if >1, n, initiates parallel copy with 'n' threads running
: number of task to be grouped for copy, this number is relevant only if
running a parallel copy, ie is set to > 1
< log > : log file
< error log > : error log file
$cp->addTask(