#!/usr/bin/perl

use strict;
use warnings;

use Fcntl qw(:flock SEEK_END);
use Getopt::Long qw(GetOptionsFromArray);
use File::Path qw(make_path);
use JSON;
use IO::File;
use String::ShellQuote 'shell_quote';
use Text::ParseWords;

my $PROGNAME = "pve-zsync";
my $CONFIG_PATH = "/var/lib/${PROGNAME}";
my $STATE = "${CONFIG_PATH}/sync_state";
my $CRONJOBS = "/etc/cron.d/$PROGNAME";
my $PATH = "/usr/sbin";
my $PVE_DIR = "/etc/pve/local";
my $QEMU_CONF = "${PVE_DIR}/qemu-server";
my $LXC_CONF = "${PVE_DIR}/lxc";
my $PROG_PATH = "$PATH/${PROGNAME}";
my $INTERVAL = 15;
my $DEBUG;

BEGIN {
    $DEBUG = 0; # change default here. not above on declaration!
    $DEBUG ||= $ENV{ZSYNC_DEBUG};
    if ($DEBUG) {
        require Data::Dumper;
        Data::Dumper->import();
    }
}

my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";

#<<< make perltidy ignore this
my $IPV6RE = "(?:" .
    "(?:(?:" .                             "(?:$IPV6H16:){6})$IPV6LS32)|" .
    "(?:(?:" .                           "::(?:$IPV6H16:){5})$IPV6LS32)|" .
    "(?:(?:(?:" .              "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" .           ")$IPV6LS32)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" .            ")$IPV6H16)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" .                    ")))";
#>>>

my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)"; # hostname or ipv4 address
my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])"; # ipv6 must always be in brackets
# targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;

my $DISK_KEY_RE = qr/^(?:(?:(?:virtio|ide|scsi|sata|efidisk|tpmstate|mp)\d+)|rootfs): /;

my $INSTANCE_ID = get_instance_id($$);

my $command = $ARGV[0];

if (defined($command) && $command ne 'help' && $command ne 'printpod') {
    check_bin('cstream');
    check_bin('zfs');
    check_bin('ssh');
    check_bin('scp');
}

$SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} = sub {
    die "Signaled, aborting sync: $!\n";
};

sub check_bin {
    my ($bin) = @_;

    foreach my $p (split(/:/, $ENV{PATH})) {
        my $fn = "$p/$bin";
        if (-x $fn) {
            return $fn;
        }
    }

    die "unable to find command '$bin'\n";
}

sub read_file {
    my ($filename, $one_line_only) = @_;

    my $fh = IO::File->new($filename, "r")
        or die "Could not open file ${filename}: $!\n";

    my $text = $one_line_only ? <$fh> : [<$fh>];

    close($fh);

    return $text;
}

sub cut_target_width {
    my ($path, $maxlen) = @_;
    $path =~ s@/+@/@g;

    return $path if length($path) <= $maxlen;

    return '..' . substr($path, -$maxlen + 2) if $path !~ m@/@;

    $path =~ s@/([^/]+/?)$@@;
    my $tail = $1;

    if (length($tail) + 3 == $maxlen) {
        return "../$tail";
    } elsif (length($tail) + 2 >= $maxlen) {
        return '..' . substr($tail, -$maxlen + 2);
    }

    $path =~ s@(/[^/]+)(?:/|$)@@;
    my $head = $1;
    my $both = length($head) + length($tail);
    my $remaining = $maxlen - $both - 4; # -4 for "/../"

    if ($remaining < 0) {
        return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
    }

    substr($path, ($remaining / 2), (length($path) - $remaining), '..');
    return "$head/" . $path . "/$tail";
}

sub locked {
    my ($lock_fn, $code) = @_;

    my $lock_fh = IO::File->new("> $lock_fn");

    flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
    my $res = eval { $code->() };
    my $err = $@;

    flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
    die "$err" if $err;

    close($lock_fh);
    return $res;
}

sub get_status {
    my ($source, $name, $status) = @_;

    if ($status->{ $source->{all} }->{$name}->{status}) {
        return $status;
    }

    return undef;
}

sub check_dataset_exists {
    my ($dataset, $ip, $user) = @_;

    my $cmd = [];

    if ($ip) {
        push @$cmd, 'ssh', "$user\@$ip", '--';
    }
    push @$cmd, 'zfs', 'list', '-H', '--', $dataset;
    eval { run_cmd($cmd); };

    if ($@) {
        return 0;
    }
    return 1;
}

sub create_file_system {
    my ($file_system, $ip, $user) = @_;

    my $cmd = [];

    if ($ip) {
        push @$cmd, 'ssh', "$user\@$ip", '--';
    }
    push @$cmd, 'zfs', 'create', $file_system;

    run_cmd($cmd);
}

sub parse_target {
    my ($text) = @_;

    my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
    my $target = {};

    if ($text !~ $TARGETRE) {
        die "$errstr\n";
    }
    $target->{all} = $2;
    $target->{ip} = $1 if $1;
    my @parts = split('/', $2);

    $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};

    my $pool = $target->{pool} = shift(@parts);
    die "$errstr\n" if !$pool;

    if ($pool =~ m/^\d+$/) {
        $target->{vmid} = $pool;
        delete $target->{pool};
    }

    return $target if (@parts == 0);
    $target->{last_part} = pop(@parts);

    if ($target->{ip}) {
        pop(@parts);
    }
    if (@parts > 0) {
        $target->{path} = join('/', @parts);
    }

    return $target;
}

sub read_cron {

    #This is for the first use to init file;
    if (!-e $CRONJOBS) {
        my $new_fh = IO::File->new("> $CRONJOBS");
        die "Could not create $CRONJOBS: $!\n" if !$new_fh;
        close($new_fh);
        return undef;
    }

    my $text = read_file($CRONJOBS, 0);

    return parse_cron(@{$text});
}

sub parse_argv {
    my (@arg) = @_;

    my $param = {
        dest => undef,
        source => undef,
        verbose => undef,
        limit => undef,
        maxsnap => undef,
        dest_maxsnap => undef,
        name => undef,
        skip => undef,
        method => undef,
        source_user => undef,
        dest_user => undef,
        prepend_storage_id => undef,
        compressed => undef,
        properties => undef,
        dest_config_path => undef,
    };

    my ($ret) = GetOptionsFromArray(
        \@arg,
        'dest=s' => \$param->{dest},
        'source=s' => \$param->{source},
        'verbose' => \$param->{verbose},
        'limit=i' => \$param->{limit},
        'maxsnap=i' => \$param->{maxsnap},
        'dest-maxsnap=i' => \$param->{dest_maxsnap},
        'name=s' => \$param->{name},
        'skip' => \$param->{skip},
        'method=s' => \$param->{method},
        'source-user=s' => \$param->{source_user},
        'dest-user=s' => \$param->{dest_user},
        'prepend-storage-id' => \$param->{prepend_storage_id},
        'compressed' => \$param->{compressed},
        'properties' => \$param->{properties},
        'dest-config-path=s' => \$param->{dest_config_path},
    );

    die "can't parse options\n" if $ret == 0;

    $param->{name} //= "default";
    $param->{maxsnap} //= 1;
    $param->{method} //= "ssh";
    $param->{source_user} //= "root";
    $param->{dest_user} //= "root";

    return $param;
}

sub add_state_to_job {
    my ($job) = @_;

    my $states = read_state();
    my $state = $states->{ $job->{source} }->{ $job->{name} };

    $job->{state} = $state->{state};
    $job->{lsync} = $state->{lsync};
    $job->{vm_type} = $state->{vm_type};
    $job->{instance_id} = $state->{instance_id};

    for (my $i = 0; $state->{"snap$i"}; $i++) {
        $job->{"snap$i"} = $state->{"snap$i"};
    }

    return $job;
}

sub parse_cron {
    my (@text) = @_;

    my $cfg = {};

    while (my $line = shift(@text)) {
        my @arg = Text::ParseWords::shellwords($line);
        my $param = parse_argv(@arg);

        if ($param->{source} && $param->{dest}) {
            my $source = delete $param->{source};
            my $name = delete $param->{name};

            $cfg->{$source}->{$name} = $param;
        }
    }

    return $cfg;
}

sub param_to_job {
    my ($param) = @_;

    my $job = {};

    my $source = parse_target($param->{source});
    my $dest;
    $dest = parse_target($param->{dest}) if $param->{dest};

    $job->{name} = !$param->{name} ? "default" : $param->{name};
    $job->{dest} = $param->{dest} if $param->{dest};
    $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
    $job->{method} = "ssh" if !$job->{method};
    $job->{limit} = $param->{limit};
    $job->{maxsnap} = $param->{maxsnap};
    $job->{dest_maxsnap} = $param->{dest_maxsnap};
    $job->{source} = $param->{source};
    $job->{source_user} = $param->{source_user};
    $job->{dest_user} = $param->{dest_user};
    $job->{prepend_storage_id} = !!$param->{prepend_storage_id};
    $job->{compressed} = !!$param->{compressed};
    $job->{properties} = !!$param->{properties};
    $job->{dest_config_path} = $param->{dest_config_path} if $param->{dest_config_path};

    return $job;
}

sub read_state {

    if (!-e $STATE) {
        make_path $CONFIG_PATH;
        my $new_fh = IO::File->new("> $STATE");
        die "Could not create $STATE: $!\n" if !$new_fh;
        print $new_fh "{}";
        close($new_fh);
        return undef;
    }

    my $text = read_file($STATE, 1);
    return decode_json($text);
}

sub update_state {
    my ($job) = @_;

    my $text = eval { read_file($STATE, 1); };

    my $out_fh = IO::File->new("> $STATE.new");
    die "Could not open file ${STATE}.new: $!\n" if !$out_fh;

    my $states = {};
    my $state = {};
    if ($text) {
        $states = decode_json($text);
        $state = $states->{ $job->{source} }->{ $job->{name} };
    }

    if ($job->{state} ne "del") {
        $state->{state} = $job->{state};
        $state->{lsync} = $job->{lsync};
        $state->{instance_id} = $job->{instance_id};
        $state->{vm_type} = $job->{vm_type};

        for (my $i = 0; $job->{"snap$i"}; $i++) {
            $state->{"snap$i"} = $job->{"snap$i"};
        }
        $states->{ $job->{source} }->{ $job->{name} } = $state;
    } else {

        delete $states->{ $job->{source} }->{ $job->{name} };
        delete $states->{ $job->{source} } if !keys %{ $states->{ $job->{source} } };
    }

    $text = encode_json($states);
    print $out_fh $text;

    close($out_fh);
    rename "$STATE.new", $STATE;

    return $states;
}

sub update_cron {
    my ($job) = @_;

    my $updated;
    my $has_header;
    my $line_no = 0;
    my $text = "";
    my $header = "SHELL=/bin/sh\n";
    $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";

    my $current = read_file($CRONJOBS, 0);

    foreach my $line (@{$current}) {
        chomp($line);
        if ($line =~ m/source $job->{source} .*name $job->{name} /) {
            $updated = 1;
            next if $job->{state} eq "del";
            $text .= format_job($job, $line);
        } else {
            if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/)) {
                $has_header = 1;
            }
            $text .= "$line\n";
        }
        $line_no++;
    }

    if (!$has_header) {
        $text = "$header$text";
    }

    if (!$updated) {
        $text .= format_job($job);
    }
    my $new_fh = IO::File->new("> ${CRONJOBS}.new");
    die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;

    print $new_fh $text or die "can't write to $CRONJOBS.new: $!\n";
    close($new_fh);

    rename "${CRONJOBS}.new", $CRONJOBS or die "can't move $CRONJOBS.new: $!\n";
}

sub format_job {
    my ($job, $line) = @_;
    my $text = "";

    if ($job->{state} eq "stopped") {
        $text = "#";
    }
    if ($line) {
        $line =~ /^#*\s*((?:\S+\s+){4}\S+)\s+root/;
        $text .= $1;
    } else {
        $text .= "*/$INTERVAL * * * *";
    }
    $text .= " root";
    $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
    $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
    $text .= " --dest-maxsnap $job->{dest_maxsnap}" if defined($job->{dest_maxsnap});
    $text .= " --limit $job->{limit}" if $job->{limit};
    $text .= " --method $job->{method}";
    $text .= " --verbose" if $job->{verbose};
    $text .= " --source-user $job->{source_user}";
    $text .= " --dest-user $job->{dest_user}";
    $text .= " --prepend-storage-id" if $job->{prepend_storage_id};
    $text .= " --compressed" if $job->{compressed};
    $text .= " --properties" if $job->{properties};
    $text .= " --dest-config-path $job->{dest_config_path}" if $job->{dest_config_path};
    $text .= "\n";

    return $text;
}

sub list {

    my $cfg = read_cron();

    my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n",
        "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");

    my $states = read_state();
    foreach my $source (sort keys %{$cfg}) {
        foreach my $name (sort keys %{ $cfg->{$source} }) {
            $list .= sprintf("%-25s", cut_target_width($source, 25));
            $list .= sprintf("%-25s", cut_target_width($name, 25));
            $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
            $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
            $list .= sprintf("%-6s",
                defined($states->{$source}->{$name}->{vm_type})
                ? $states->{$source}->{$name}->{vm_type}
                : "undef");
            $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
        }
    }

    return $list;
}

sub vm_exists {
    my ($target, $user) = @_;

    return undef if !defined($target->{vmid});

    my $conf_fn = "$target->{vmid}.conf";

    if ($target->{ip}) {
        my @cmd = ('ssh', "$user\@$target->{ip}", '--', '/bin/ls');
        return "qemu" if eval { run_cmd([@cmd, "$QEMU_CONF/$conf_fn"]) };
        return "lxc" if eval { run_cmd([@cmd, "$LXC_CONF/$conf_fn"]) };
    } else {
        return "qemu" if -f "$QEMU_CONF/$conf_fn";
        return "lxc" if -f "$LXC_CONF/$conf_fn";
    }

    return undef;
}

sub init {
    my ($param) = @_;

    locked(
        "$CONFIG_PATH/cron_and_state.lock",
        sub {
            my $cfg = read_cron();

            my $job = param_to_job($param);

            $job->{state} = "ok";
            $job->{lsync} = 0;

            my $source = parse_target($param->{source});
            my $dest = parse_target($param->{dest});

            if (my $ip = $dest->{ip}) {
                run_cmd([
                    'ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip",
                ]);
            }

            if (my $ip = $source->{ip}) {
                run_cmd(
                    [
                        'ssh-copy-id',
                        '-i',
                        '/root/.ssh/id_rsa.pub',
                        "$param->{source_user}\@$ip",
                    ],
                );
            }

            die "Pool $dest->{all} does not exist\n"
                if !check_dataset_exists($dest->{all}, $dest->{ip}, $param->{dest_user});

            if (!defined($source->{vmid})) {
                die "Pool $source->{all} does not exist\n"
                    if !check_dataset_exists($source->{all}, $source->{ip}, $param->{source_user});
            }

            my $vm_type = vm_exists($source, $param->{source_user});
            $job->{vm_type} = $vm_type;
            $source->{vm_type} = $vm_type;

            die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;

            die "Config already exists\n" if $cfg->{ $job->{source} }->{ $job->{name} };

            #check if vm has zfs disks if not die;
            get_disks($source, $param->{source_user}) if $source->{vmid};

            update_cron($job);
            update_state($job);
        },
    ); #cron and state lock

    return if $param->{skip};

    eval { sync($param) };
    if (my $err = $@) {
        destroy_job($param);
        print $err;
    }
}

sub get_job {
    my ($param) = @_;

    my $cfg = read_cron();

    if (!$cfg->{ $param->{source} }->{ $param->{name} }) {
        die "Job  with source $param->{source} and name $param->{name} does not exist\n";
    }
    my $job = $cfg->{ $param->{source} }->{ $param->{name} };
    $job->{name} = $param->{name};
    $job->{source} = $param->{source};
    $job = add_state_to_job($job);

    return $job;
}

sub destroy_job {
    my ($param) = @_;

    locked(
        "$CONFIG_PATH/cron_and_state.lock",
        sub {
            my $job = get_job($param);
            $job->{state} = "del";

            update_cron($job);
            update_state($job);
        },
    );
}

sub get_instance_id {
    my ($pid) = @_;

    my $stat = read_file("/proc/$pid/stat", 1)
        or die "unable to read process stats\n";
    my $boot_id = read_file("/proc/sys/kernel/random/boot_id", 1)
        or die "unable to read boot ID\n";

    my $stats = [split(/\s+/, $stat)];
    my $starttime = $stats->[21];
    chomp($boot_id);

    return "${pid}:${starttime}:${boot_id}";
}

sub instance_exists {
    my ($instance_id) = @_;

    if (defined($instance_id) && $instance_id =~ m/^([1-9][0-9]*):/) {
        my $pid = $1;
        my $actual_id = eval { get_instance_id($pid); };
        return defined($actual_id) && $actual_id eq $instance_id;
    }

    return 0;
}

sub sync {
    my ($param) = @_;

    my $job;

    locked(
        "$CONFIG_PATH/cron_and_state.lock",
        sub {
            eval { $job = get_job($param) };

            if ($job) {
                my $state = $job->{state} // 'ok';
                $state = 'ok' if !instance_exists($job->{instance_id});

                if ($state eq "syncing" || $state eq "waiting") {
                    die
                        "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
                }

                $job->{state} = "waiting";
                $job->{instance_id} = $INSTANCE_ID;

                update_state($job);
            }
        },
    );

    locked(
        "$CONFIG_PATH/sync.lock",
        sub {

            my $date = get_date();

            my $dest;
            my $source;
            my $vm_type;

            locked(
                "$CONFIG_PATH/cron_and_state.lock",
                sub {
                    #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
                    eval { $job = get_job($param); };

                    if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
                        die
                            "Job --source $param->{source} --name $param->{name} has been disabled\n";
                    }

                    $dest = parse_target($param->{dest});
                    $source = parse_target($param->{source});

                    $vm_type = vm_exists($source, $param->{source_user});
                    $source->{vm_type} = $vm_type;

                    if ($job) {
                        $job->{state} = "syncing";
                        $job->{vm_type} = $vm_type if !$job->{vm_type};
                        update_state($job);
                    }
                },
            ); #cron and state lock

            my $sync_path = sub {
                my ($source, $dest, $job, $param, $date) = @_;

                my $dest_dataset = target_dataset($source, $dest);

                ($dest->{old_snap}, $dest->{last_snap}) = snapshot_get(
                    $dest_dataset,
                    $param->{dest_maxsnap} // $param->{maxsnap},
                    $param->{name},
                    $dest->{ip},
                    $param->{dest_user},
                );

                ($source->{old_snap}) = snapshot_get(
                    $source->{all},
                    $param->{maxsnap},
                    $param->{name},
                    $source->{ip},
                    $param->{source_user},
                );

                prepare_prepended_target($source, $dest, $param->{dest_user})
                    if defined($dest->{prepend});

                snapshot_add(
                    $source,
                    $dest,
                    $param->{name},
                    $date,
                    $param->{source_user},
                    $param->{dest_user},
                );

                send_image($source, $dest, $param);

                for my $old_snap (@{ $source->{old_snap} }) {
                    snapshot_destroy(
                        $source->{all}, $old_snap, $source->{ip}, $param->{source_user},
                    );
                }

                for my $old_snap (@{ $dest->{old_snap} }) {
                    snapshot_destroy($dest_dataset, $old_snap, $dest->{ip},
                        $param->{dest_user});
                }
            };

            eval {
                if ($source->{vmid}) {
                    die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
                    die "source-user has to be root for syncing VMs\n"
                        if ($param->{source_user} ne "root");
                    my $disks = get_disks($source, $param->{source_user});

                    foreach my $disk (sort keys %{$disks}) {
                        $source->{all} = $disks->{$disk}->{all};
                        $source->{pool} = $disks->{$disk}->{pool};
                        $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
                        $source->{last_part} = $disks->{$disk}->{last_part};

                        $dest->{prepend} = $disks->{$disk}->{storage_id}
                            if $param->{prepend_storage_id};

                        &$sync_path($source, $dest, $job, $param, $date);
                    }
                    if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
                        send_config(
                            $source,
                            $dest,
                            'ssh',
                            $param->{source_user},
                            $param->{dest_user},
                            $param->{dest_config_path},
                        );
                    } else {
                        send_config(
                            $source,
                            $dest,
                            'local',
                            $param->{source_user},
                            $param->{dest_user},
                            $param->{dest_config_path},
                        );
                    }
                } else {
                    &$sync_path($source, $dest, $job, $param, $date);
                }
            };
            if (my $err = $@) {
                locked(
                    "$CONFIG_PATH/cron_and_state.lock",
                    sub {
                        eval { $job = get_job($param); };
                        if ($job) {
                            $job->{state} = "error";
                            delete $job->{instance_id};
                            update_state($job);
                        }
                    },
                );
                print
                    "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
                die "$err\n";
            }

            locked(
                "$CONFIG_PATH/cron_and_state.lock",
                sub {
                    eval { $job = get_job($param); };
                    if ($job) {
                        if (defined($job->{state}) && $job->{state} eq "stopped") {
                            $job->{state} = "stopped";
                        } else {
                            $job->{state} = "ok";
                        }
                        $job->{lsync} = $date;
                        delete $job->{instance_id};
                        update_state($job);
                    }
                },
            );
        },
    ); #sync lock
}

sub snapshot_get {
    my ($dataset, $max_snap, $name, $ip, $user) = @_;

    my $cmd = [];
    push @$cmd, 'ssh', "$user\@$ip", '--', if $ip;
    push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
    push @$cmd, $dataset;

    my $raw;
    eval { $raw = run_cmd($cmd) };
    if (my $erro = $@) { #this means the volume doesn't exist on dest yet
        return undef;
    }

    my $index = 0;
    my $line = "";
    my $last_snap = undef;
    my $old_snap = [];

    while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
        $line = $1;
        if ($line =~ m/@(.*)$/) {
            $last_snap = $1 if (!$last_snap);
        }
        if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {
            # interpreted as infinity
            last if $max_snap <= 0;

            my $snap = $1;
            $index++;

            if ($index >= $max_snap) {
                push @{$old_snap}, $snap;
            }
        }
    }

    return ($old_snap, $last_snap) if $last_snap;

    return undef;
}

sub snapshot_add {
    my ($source, $dest, $name, $date, $source_user, $dest_user) = @_;

    my $snap_name = "rep_$name\_" . $date;

    $source->{new_snap} = $snap_name;

    my $path = "$source->{all}\@$snap_name";

    my $cmd = [];
    push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--', if $source->{ip};
    push @$cmd, 'zfs', 'snapshot', $path;
    eval { run_cmd($cmd); };

    if (my $err = $@) {
        snapshot_destroy($source->{all}, $snap_name, $source->{ip}, $source_user);
        die "$err\n";
    }
}

sub get_disks {
    my ($target, $user) = @_;

    my $cmd = [];
    push @$cmd, 'ssh', "$user\@$target->{ip}", '--', if $target->{ip};

    if ($target->{vm_type} eq 'qemu') {
        push @$cmd, 'qm', 'config', $target->{vmid};
    } elsif ($target->{vm_type} eq 'lxc') {
        push @$cmd, 'pct', 'config', $target->{vmid};
    } else {
        die "VM Type unknown\n";
    }

    my $res = run_cmd($cmd);

    my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $user);

    return $disks;
}

sub run_cmd {
    my ($cmd) = @_;
    print "Start CMD\n" if $DEBUG;
    print Dumper $cmd if $DEBUG;
    if (ref($cmd) eq 'ARRAY') {
        $cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
    }
    my $output = `$cmd 2>&1`;

    die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;

    chomp($output);
    print Dumper $output if $DEBUG;
    print "END CMD\n" if $DEBUG;
    return $output;
}

sub parse_disks {
    my ($text, $ip, $vm_type, $user) = @_;

    my $disks;

    my $num = 0;
    while ($text && $text =~ s/^(.*?)(\n|$)//) {
        my $line = $1;

        next if $line =~ /media=cdrom/;
        next if $line !~ m/$DISK_KEY_RE/;

        #QEMU if backup is not set include in  sync
        next if $vm_type eq 'qemu' && ($line =~ m/backup=(?i:0|no|off|false)/);

        #LXC if backup is not set do no in sync
        next
            if $vm_type eq 'lxc' && ($line =~ m/^mp\d:/) && ($line !~ m/backup=(?i:1|yes|on|true)/);

        my $disk = undef;
        my $stor = undef;
        if ($line =~ m/$DISK_KEY_RE(.*)$/) {
            my @parameter = split(/,/, $1);

            foreach my $opt (@parameter) {
                if ($opt =~ m/^(?:file=|volume=)?([^:]+):([A-Za-z0-9\-]+)$/) {
                    $disk = $2;
                    $stor = $1;
                    last;
                }
            }
        }
        if (!defined($disk) || !defined($stor)) {
            print "Disk: \"$line\" has no valid zfs dataset format and will be skipped\n";
            next;
        }

        my $cmd = [];
        push @$cmd, 'ssh', "$user\@$ip", '--' if $ip;
        push @$cmd, 'pvesm', 'path', "$stor:$disk";
        my $path = run_cmd($cmd);

        die "Get no path from pvesm path $stor:$disk\n" if !$path;

        $disks->{$num}->{storage_id} = $stor;

        if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {

            my @array = split('/', $1);
            $disks->{$num}->{pool} = shift(@array);
            $disks->{$num}->{all} = $disks->{$num}->{pool};
            if (0 < @array) {
                $disks->{$num}->{path} = join('/', @array);
                $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
            }
            $disks->{$num}->{last_part} = $disk;
            $disks->{$num}->{all} .= "\/$disk";

            $num++;
        } elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {

            $disks->{$num}->{pool} = $1;
            $disks->{$num}->{all} = $disks->{$num}->{pool};

            if ($2) {
                $disks->{$num}->{path} = $3;
                $disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
            }

            $disks->{$num}->{last_part} = $disk;
            $disks->{$num}->{all} .= "\/$disk";

            $num++;

        } else {
            die "unexpected path '$path'\n";
        }
    }

    die "Guest does not include any ZFS volumes (or all are excluded by the backup flag).\n"
        if !$disks->{0};
    return $disks;
}

# how the corresponding dataset is named on the target
sub target_dataset {
    my ($source, $dest) = @_;

    my $target = "$dest->{all}";
    $target .= "/$dest->{prepend}" if defined($dest->{prepend});
    $target .= "/$source->{last_part}" if $source->{last_part};
    $target =~ s!/+!/!g;

    return $target;
}

# create the parent dataset for the actual target
sub prepare_prepended_target {
    my ($source, $dest, $dest_user) = @_;

    die "internal error - not a prepended target\n" if !defined($dest->{prepend});

    # The parent dataset shouldn't be the actual target.
    die "internal error - no last_part for source\n" if !$source->{last_part};

    my $target = "$dest->{all}/$dest->{prepend}";
    $target =~ s!/+!/!g;

    return if check_dataset_exists($target, $dest->{ip}, $dest_user);

    create_file_system($target, $dest->{ip}, $dest_user);
}

sub snapshot_destroy {
    my ($dataset, $snap, $ip, $user) = @_;

    my @zfscmd = ('zfs', 'destroy');
    my $snapshot = "$dataset\@$snap";

    eval {
        if ($ip) {
            run_cmd(['ssh', "$user\@$ip", '--', @zfscmd, $snapshot]);
        } else {
            run_cmd([@zfscmd, $snapshot]);
        }
    };
    if (my $erro = $@) {
        warn "WARN: $erro";
    }
}

# check if snapshot for incremental sync exist on source side
sub snapshot_exist {
    my ($source, $dest, $method, $source_user) = @_;

    my $cmd = [];
    push @$cmd, 'ssh', "$source_user\@$source->{ip}", '--' if $source->{ip};
    push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';

    my $path = $source->{all};
    $path .= "\@$dest->{last_snap}";

    push @$cmd, $path;

    eval { run_cmd($cmd) };
    if (my $erro = $@) {
        warn "WARN: $erro";
        return undef;
    }
    return 1;
}

sub send_image {
    my ($source, $dest, $param) = @_;

    my $cmd = [];

    push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{source_user}\@$source->{ip}", '--'
        if $source->{ip};
    push @$cmd, 'zfs', 'send';
    push @$cmd, '-L', if $param->{compressed}; # no effect if dataset never had large recordsize
    push @$cmd, '-c', if $param->{compressed};
    push @$cmd, '-p', if $param->{properties};
    push @$cmd, '-v' if $param->{verbose};

    if (
        $dest->{last_snap}
        && snapshot_exist($source, $dest, $param->{method}, $param->{source_user})
    ) {
        push @$cmd, '-i', "$source->{all}\@$dest->{last_snap}";
    }
    push @$cmd, '--', "$source->{all}\@$source->{new_snap}";

    if ($param->{limit}) {
        my $bwl = $param->{limit} * 1024;
        push @$cmd, \'|', 'cstream', '-t', $bwl;
    }
    my $target = target_dataset($source, $dest);

    push @$cmd, \'|';
    push @$cmd, 'ssh', '-o', 'BatchMode=yes', "$param->{dest_user}\@$dest->{ip}", '--'
        if $dest->{ip};
    push @$cmd, 'zfs', 'recv', '-F', '--';
    push @$cmd, "$target";

    eval { run_cmd($cmd) };

    if (my $erro = $@) {
        snapshot_destroy($source->{all}, $source->{new_snap}, $source->{ip}, $param->{source_user});
        die $erro;
    }
}

sub send_config {
    my ($source, $dest, $method, $source_user, $dest_user, $dest_config_path) = @_;

    my $source_target =
        $source->{vm_type} eq 'qemu'
        ? "$QEMU_CONF/$source->{vmid}.conf"
        : "$LXC_CONF/$source->{vmid}.conf";
    my $dest_target_new = "$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";

    my $config_dir = $dest_config_path // $CONFIG_PATH;
    $config_dir .= "/$dest->{last_part}" if $dest->{last_part};

    $dest_target_new = $config_dir . '/' . $dest_target_new;

    if ($method eq 'ssh') {
        if ($dest->{ip} && $source->{ip}) {
            run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
            run_cmd(
                [
                    'scp',
                    '--',
                    "$source_user\@[$source->{ip}]:$source_target",
                    "$dest_user\@[$dest->{ip}]:$dest_target_new",
                ],
            );
        } elsif ($dest->{ip}) {
            run_cmd(['ssh', "$dest_user\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
            run_cmd(['scp', '--', $source_target, "$dest_user\@[$dest->{ip}]:$dest_target_new"]);
        } elsif ($source->{ip}) {
            run_cmd(['mkdir', '-p', '--', $config_dir]);
            run_cmd(['scp', '--', "$source_user\@[$source->{ip}]:$source_target", $dest_target_new]
            );
        }

        for my $old_snap (@{ $dest->{old_snap} }) {
            my $dest_target_old =
                "${config_dir}/$source->{vmid}.conf.$source->{vm_type}.${old_snap}";
            if ($dest->{ip}) {
                run_cmd(
                    [
                        'ssh',
                        "$dest_user\@$dest->{ip}",
                        '--',
                        'rm',
                        '-f',
                        '--',
                        $dest_target_old,
                    ],
                );
            } else {
                run_cmd(['rm', '-f', '--', $dest_target_old]);
            }
        }
    } elsif ($method eq 'local') {
        run_cmd(['mkdir', '-p', '--', $config_dir]);
        run_cmd(['cp', $source_target, $dest_target_new]);
    }
}

sub get_date {
    my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
    my $datestamp =
        sprintf("%04d-%02d-%02d_%02d:%02d:%02d", $year + 1900, $mon + 1, $mday, $hour, $min, $sec);

    return $datestamp;
}

sub status {
    my $cfg = read_cron();

    my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");

    my $states = read_state();

    foreach my $source (sort keys %{$cfg}) {
        foreach my $sync_name (sort keys %{ $cfg->{$source} }) {
            $status_list .= sprintf("%-25s", cut_target_width($source, 25));
            $status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
            $status_list .= "$states->{$source}->{$sync_name}->{state}\n";
        }
    }

    return $status_list;
}

sub enable_job {
    my ($param) = @_;

    locked(
        "$CONFIG_PATH/cron_and_state.lock",
        sub {
            my $job = get_job($param);
            $job->{state} = "ok";
            update_state($job);
            update_cron($job);
        },
    );
}

sub disable_job {
    my ($param) = @_;

    locked(
        "$CONFIG_PATH/cron_and_state.lock",
        sub {
            my $job = get_job($param);
            $job->{state} = "stopped";
            update_state($job);
            update_cron($job);
        },
    );
}

my $cmd_help = {
    destroy => qq{
$PROGNAME destroy --source <string> [OPTIONS]

    Remove a sync Job from the scheduler

	--name      string
		The name of the sync job, if not set 'default' is used.

	--source    string
		The source can be an  <VMID> or [IP:]<ZFSPool>[/Path]
    },
    create => qq{
$PROGNAME create --dest <string> --source <string> [OPTIONS]

    Create a new sync-job

	--dest      string
		The destination target is like [IP]:<Pool>[/Path]

	--dest-user string
		The name of the user on the destination target, root by default

	--limit     integer
		Maximal sync speed in kBytes/s, default is unlimited

	--maxsnap   integer
		The number of snapshots to keep until older ones are erased.
		The default is 1, use 0 for unlimited.

	--dest-maxsnap   integer
		Override maxsnap for the destination dataset.

	--name      string
		The name of the sync job, if not set it is default

	--prepend-storage-id
		If specified, prepend the storage ID to the destination's path(s).

	--skip
		If specified, skip the first sync.

	--source    string
		The source can be an <VMID> or [IP:]<ZFSPool>[/Path]

	--source-user    string
		The (ssh) user-name on the source target, root by default

	--compressed
		If specified, send data without decompressing first. If features lz4_compress,
		zstd_compress or large_blocks are in use by the source, they need to be enabled on
		the target as well.

	--properties
		If specified, include the dataset's properties in the stream.

	--dest-config-path    string
		Specifies a custom config path on the destination target.
		The default is /var/lib/pve-zsync
    },
    sync => qq{
$PROGNAME sync --dest <string> --source <string> [OPTIONS]\n

    Trigger one sync.

	--dest      string
		The destination target is like [IP:]<Pool>[/Path]

	--dest-user string
		The (ssh) user-name on the destination target, root by default

	--limit     integer
		The maximal sync speed in kBytes/s, default is unlimited

	--maxsnap   integer
		The number of snapshots to keep until older ones are erased.
		The default is 1, use 0 for unlimited.

	--dest-maxsnap   integer
		Override maxsnap for the destination dataset.

	--name      string
		The name of the sync job, if not set it is 'default'.
		It is only necessary if scheduler allready contains this source.

	--prepend-storage-id
		If specified, prepend the storage ID to the destination's path(s).

	--source    string
		The source can either be an <VMID> or [IP:]<ZFSPool>[/Path]

	--source-user    string
		The name of the user on the source target, root by default

	--verbose
		If specified, print out the sync progress.

	--compressed
		If specified, send data without decompressing first. If features lz4_compress,
		zstd_compress or large_blocks are in use by the source, they need to be enabled on
		the target as well.

	--properties
		If specified, include the dataset's properties in the stream.

	--dest-config-path    string
		Specifies a custom config path on the destination target.
		The default is /var/lib/pve-zsync
    },
    list => qq{
$PROGNAME list

	Get a List of all scheduled Sync Jobs
    },
    status => qq{
$PROGNAME status

	Get the status of all scheduled Sync Jobs
    },
    help => qq{
$PROGNAME help <cmd> [OPTIONS]

    Get help about specified command.

	<cmd>      string
		Command name to get help about.

	--verbose
		Verbose output format.
    },
    enable => qq{
$PROGNAME enable --source <string> [OPTIONS]

    Enable a sync-job and reset all job-errors, if any.

	--name      string
		name of the sync job, if not set it is default

        --source    string
		the source can be an  <VMID> or [IP:]<ZFSPool>[/Path]
    },
    disable => qq{
$PROGNAME disable --source <string> [OPTIONS]

    Disables (pauses) a sync-job

	--name      string
		name of the sync-job, if not set it is default

	--source    string
		the source can be an  <VMID> or [IP:]<ZFSPool>[/Path]
    },
    printpod => "$PROGNAME printpod\n\n\tinternal command",

};

if (!$command) {
    usage();
    die "\n";
} elsif (!$cmd_help->{$command}) {
    print "ERROR: unknown command '$command'";
    usage(1);
    die "\n";
}

my @arg = @ARGV;
my $param = parse_argv(@arg);

sub check_params {
    for (@_) {
        die "$cmd_help->{$command}\n" if !$param->{$_};
    }
}

if ($command eq 'destroy') {
    check_params(qw(source));

    check_target($param->{source});
    destroy_job($param);

} elsif ($command eq 'sync') {
    check_params(qw(source dest));

    check_target($param->{source});
    check_target($param->{dest});
    sync($param);

} elsif ($command eq 'create') {
    check_params(qw(source dest));

    check_target($param->{source});
    check_target($param->{dest});
    init($param);

} elsif ($command eq 'status') {
    print status();

} elsif ($command eq 'list') {
    print list();

} elsif ($command eq 'help') {
    my $help_command = $ARGV[1];

    if ($help_command && $cmd_help->{$help_command}) {
        die "$cmd_help->{$help_command}\n";

    }
    if ($param->{verbose}) {
        exec("man $PROGNAME");

    } else {
        usage(1);

    }

} elsif ($command eq 'enable') {
    check_params(qw(source));

    check_target($param->{source});
    enable_job($param);

} elsif ($command eq 'disable') {
    check_params(qw(source));

    check_target($param->{source});
    disable_job($param);

} elsif ($command eq 'printpod') {
    print_pod();
}

sub usage {
    my ($help) = @_;

    print("ERROR:\tno command specified\n") if !$help;
    print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
    print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
    print("\t$PROGNAME create --dest <string> --source <string> [OPTIONS]\n");
    print("\t$PROGNAME destroy --source <string> [OPTIONS]\n");
    print("\t$PROGNAME disable --source <string> [OPTIONS]\n");
    print("\t$PROGNAME enable --source <string> [OPTIONS]\n");
    print("\t$PROGNAME list\n");
    print("\t$PROGNAME status\n");
    print("\t$PROGNAME sync --dest <string> --source <string> [OPTIONS]\n");
}

sub check_target {
    my ($target) = @_;
    parse_target($target);
}

sub print_pod {

    my $synopsis = join("\n", sort values %$cmd_help);
    my $commands = join(", ", sort keys %$cmd_help);

    print <<EOF;
=head1 NAME

pve-zsync - PVE ZFS Storage Sync Tool

=head1 SYNOPSIS

pve-zsync <COMMAND> [ARGS] [OPTIONS]

Where <COMMAND> can be one of: $commands

=head1 DESCRIPTION

The pve-zsync tool can help you to sync your VMs or directories stored on ZFS
between multiple servers.

pve-zsync is able to automatically configure CRON jobs, so that a periodic sync
will be automatically triggered.
The default sync interval is 15 min, if you want to change this value you can
do this in F</etc/cron.d/pve-zsync>. If you need help to configure CRON tabs, see
man crontab.

=head1 COMMANDS AND OPTIONS

$synopsis

=head1 EXAMPLES

Adds a job for syncing the local VM 100 to a remote server's ZFS pool named "tank":
    pve-zsync create --source=100 -dest=192.168.1.2:tank

=head1 IMPORTANT FILES

Cron jobs and config are stored in F</etc/cron.d/pve-zsync>

The VM configuration itself gets copied to the destination machines
F</var/lib/pve-zsync/> path.

=head1 COPYRIGHT AND DISCLAIMER

Copyright (C) 2007-2021 Proxmox Server Solutions GmbH

This program is free software: you can redistribute it and/or modify it under
the terms of the GNU Affero General Public License as published by the Free
Software Foundation, either version 3 of the License, or (at your option) any
later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU Affero General Public License for more
details.

You should have received a copy of the GNU Affero General Public License along
with this program. If not, see <http://www.gnu.org/licenses/>.

EOF
}
