#!/usr/bin/perl
#
# mysqlsync v1.0-alpha (a.k.a the mysql-syncer-upper-script-thingy)
# keep remote copies of a mysql database in sync with a master
# by mark jeftovic <markjr@easyDNS.com>
# copyright feb/1999 easyDNS Technologies Inc.
#
# All rights reserved.
#
# This code provided "As Is" with no warrantees express or implied.
# The author and contributors are not liable for anything good or
# bad that results from your use of this code.
#
# You are free to distribute this for free provided this notice is
# included. Please forward fixes/enhancements to the author for
# inclusion in the next revision.
#                                                                 

$VERSION = "v1.0-alpha";
$MAINTAINER = 'markjr@easydns.com';

use Mysql;
use Getopt::Std;
use Sys::Hostname; $thishost = hostname;   
use File::Basename;

@SAVE_ARGV=@ARGV;
$script = basename($0);

getopts("d:t:h:l:o:vs:bu");

if($opt_u) {
print<<"EOF";
$script $VERSION (a.k.a the mysql-syncer-upper-script-thingy) 
email: $MAINTAINER

Usage: $script -d <dbase> -h <hosts> -t <tables> \\
	[ -o <operations> -l <mysql_log> -s <offset,whence> -b -v -u ]

Where:
	-d database to keep in sync 
	-h comma seperated list of remote hosts
	-t comma seperated list of tables

Optional switches:
	-o comma seperated list of operations to sync 
	   (defaults to: update,insert,delete)
	-l alternate mysql_log
	-s start from offset relative to whence
	   (defaults to value in \$OFFSET_LOG,0 if present, otherwise 80,2)
	-b fork a daemon
	-v verbose mode (logs operations to \$SYNC_LOG)
	-u this message
EOF
exit;
}

$MYSQL_DIR	= "/export/mysql";
$DEFAULT_LOG 	= sprintf("%s/var/%s.log", $MYSQL_DIR, $thishost);
$OFFSET_LOG	= $MYSQL_DIR."/mysqlsync.offset";
$PID_FILE	= $MYSQL_DIR."/mysqlsync.pid";
$SYNC_LOG	= $MYSQL_DIR."/mysqlsync.log";
$DEFAULT_OFFSET	= 80;
$DEBUG_LEVEL	= 0;
$SLEEP		= 1;	# sleep value between log polls
$PASSWD		= '';	# chmod this script appropriately if you set this

$MYSQL_LOG 	= $opt_l ? $opt_l : $DEFAULT_LOG; 
@OP		= $opt_o ? split(/\,/,$opt_o) : qw(insert update delete);

$opt_h ? @HOST = split(/\,/,$opt_h) : die "Need host list use \"$script -u\" for usage";
$opt_t ? @TABLE = split(/\,/,$opt_t) : die "Need table list use \"$script -u\" for usage";
$opt_d ? @DBASE = split(/\,/,$opt_d) : die "Need dbase use \"$script -u\" for usage";

if(@DBASE>1) {
	print "$script $VERSION does not yet support multiple databases\n";
	print "run another process for each additional database.\n";
	exit(-1);
	}

if(-f $SYNC_LOG){ open(SYNC_LOG, ">>$SYNC_LOG") or die $!; }
else { open(SYNC_LOG, ">$SYNC_LOG") or die $!; }
select SYNC_LOG; $|=1;

&logit("mysqlsync $VERSION <$MAINTAINER> starting.");
&write_pid($PID_FILE);

($offset,$pos)=&get_offset();

BEGIN:
open(LOG, $MYSQL_LOG) or die $!;
seek(LOG, $offset, $pos);

&logit("Seeking to $offset, relative to $pos in $MYSQL_LOG") if($opt_v);
&init_signals; 
&daemon if($opt_b);

for(;;) {
	while($line=<LOG>){
		print STDERR $line if($DEBUG_LEVEL>=3);
		# if($line=~/\d+\s+\d\d:\d\d:\d\d\s+(\d+)\sConnect\s+(.+)@(.+)\s+on/) {
		if($line=~/\s+(\d+)\sConnect\s+(.+)@(.+)\s+on/) {
			if(!grep(/^$3$/,@HOST)){
				${$1}{host}=$3;
				${$1}{user}=$2;
				print STDERR "host: ${$1}{host} / ${$1}{user}\n" if($DEBUG_LEVEL);
				}
			}
		elsif($line=~/\d+\s+\d\d:\d\d:\d\d\s(\d+)\sQuit/) {
			if(defined(%{$1})) { undef(%{$1}); }
			}
		elsif($line=~/\s+(\d+)\sInit\sDB\s+\W(.+)\W/) {
			if(defined(%{$1})){
				if(grep(/^$2$/,@DBASE)) {
					${$1}{dbase}=$2;
					print STDERR "dbase: ${$1}{dbase}\n" if($DEBUG_LEVEL);
					}
				 # else	{ undef(%{$1}); }
				}
			}
		elsif($line=~/\s+(\d+)\s+Query\s+(\w+)\s+(.+)$/) {
			if(defined(%{$1})){ 
				if(grep(/^$2$/i,@OP)) {  
					${$1}{op}=$2;
					print STDERR "op: ${$1}{op} ($3)\n" if($DEBUG_LEVEL);
					${$1}{table}=get_tbl(${$1}{op}, $3);
					print STDERR "table: ${$1}{table}\n" if($DEBUG_LEVEL);
					if(grep(/^${$1}{table}$/,@TABLE)) {
						${$1}{query}=sprintf("%s %s", ${$1}{op}, $3);
						foreach $h (@HOST)  { 
							print STDERR "$h/${$1}{dbase}/${$1}{user}: ${$1}{query}\n" if($DEBUG_LEVEL);
							&logit("On $h: ".${$1}{query}) if($opt_v); 
							$dbh = Mysql->Connect($h, ${$1}{dbase}, ${$1}{user},$PASSWD);
							eval '$sth = $dbh->Query("${$1}{query}") || &logit("ERROR: ".$dbh->errmsg)';
							&logit("ERROR: ".$@) if($@);
							undef $dbh;
							}
						}
					}
				}
			}
		seek(LOG,0,1);
		$cur_log_pos = tell LOG;
		print STDERR "Seeking to $cur_log_pos\n" if($DEBUG_LEVEL>=2);
		}
	$cur_log_size = (stat $MYSQL_LOG)[7];
	if($cur_log_size<$cur_log_pos) {
		&logit("\$cur_log_size < \$cur_log_pos ($cur_log_size/$cur_log_pos), log rotated?") if($opt_v);
		&logit("Resetting log pointer");
		close(LOG);
		$offset=$pos=$cur_log_pos=0;
		goto BEGIN;
		}
	sleep($SLEEP);
	}
	
sub write_pid {
my($pid_file)=@_;
open(PID, ">$pid_file") || warn $!;
print PID $$;
close(PID);        
}

sub get_tbl {
my($op, $query)=@_;
if($op =~ /^update$/i) { $query =~ /^\s*(\w+)\s+set\s+.+/; return($1); }
elsif ($op =~ /^delete$/i) { $query =~ /^from\s+(\w+)\s+.*/; return($1); }
elsif ($op =~ /^insert$/i) { $query =~ /^into\s+(\w+)\s+.*/; return($1); }
elsif ($op =~ /^drop$/i) { $query =~ /^table\s+(\w+)\s+.*/; return($1); }

# select serves no real purpose in sync-ing other than debugging 
elsif ($op =~ /^select$/i) { $query =~ /^.+\s+from\s+(\w+)\s+.*/; return($1); }
return(undef);
}

sub get_offset {
if($opt_s) { ($offset,$pos)=split(/\,/,$opt_s); }
elsif(-f $OFFSET_LOG) {
        $pos = 0;
        open(OFFSET, $OFFSET_LOG) || die $!;
        chomp($offset=<OFFSET>);
        close(OFFSET);
        }
else    {
        print STDERR "warning: no $OFFSET_LOG and no -s option, using $DEFAULT_OFFSET\n";
        $pos = 2;
        $offset = $DEFAULT_OFFSET;
        }                                      
return($offset,$pos);
}

sub init_signals {
    $SIG{'INT'} = \&do_kill;
    $SIG{'QUIT'} = \&do_kill;
    $SIG{'TERM'} = \&do_kill;
}

sub logit {
my($msg)=@_;
chomp($msg);
chomp($now=scalar localtime(time));
print SYNC_LOG "[$now] $msg\n";
}

sub do_kill {
&logit("Caught SIGTERM -exiting.") if($opt_v);
open(OFFSET, ">$OFFSET_LOG") || die $!;
print OFFSET tell(LOG);
close(OFFSET);
exit;
}

# this routine was pilfered from lbnamed -mark
sub daemon {
    local(*TTY,*NULL);

    exit(0) if (fork);
    write_pid($PID_FILE);
    if (open(NULL,"/dev/null")) {
        open(STDIN,">&NULL") || close(STDIN);
        open(STDOUT,">&NULL") || close(STDOUT);
        open(STDERR,">&NULL") || close(STDERR);
    } else {
        close(STDIN);
        close(STDOUT);
        close(STDERR);
    }
    eval 'require "sys/ioctl.ph";';
    return if !defined(&TIOCNOTTY);
    open(TTY,"+>/dev/tty") || return;
    ioctl(TTY,&TIOCNOTTY,0);
    close(TTY);
}