redis-trib.rb 59.1 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695
#!/usr/bin/env ruby

# TODO (temporary here, we'll move this into the Github issues once
#       redis-trib initial implementation is completed).
#
# - Make sure that if the rehashing fails in the middle redis-trib will try
#   to recover.
# - When redis-trib performs a cluster check, if it detects a slot move in
#   progress it should prompt the user to continue the move from where it
#   stopped.
# - Gracefully handle Ctrl+C in move_slot to prompt the user if really stop
#   while rehashing, and performing the best cleanup possible if the user
#   forces the quit.
# - When doing "fix" set a global Fix to true, and prompt the user to
#   fix the problem if automatically fixable every time there is something
#   to fix. For instance:
#   1) If there is a node that pretend to receive a slot, or to migrate a
#      slot, but has no entries in that slot, fix it.
#   2) If there is a node having keys in slots that are not owned by it
#      fix this condition moving the entries in the same node.
#   3) Perform more possibly slow tests about the state of the cluster.
#   4) When aborted slot migration is detected, fix it.

require 'rubygems'
require 'redis'

ClusterHashSlots = 16384
MigrateDefaultTimeout = 60000
MigrateDefaultPipeline = 10
RebalanceDefaultThreshold = 2

$verbose = false

def xputs(s)
    case s[0..2]
    when ">>>"
        color="29;1"
    when "[ER"
        color="31;1"
    when "[WA"
        color="31;1"
    when "[OK"
        color="32"
    when "[FA","***"
        color="33"
    else
        color=nil
    end

    color = nil if ENV['TERM'] != "xterm"
    print "\033[#{color}m" if color
    print s
    print "\033[0m" if color
    print "\n"
end

class ClusterNode
    def initialize(addr)
        s = addr.split(":")
        if s.length < 2
           puts "Invalid IP or Port (given as #{addr}) - use IP:Port format"
           exit 1
        end
        port = s.pop # removes port from split array
        ip = s.join(":") # if s.length > 1 here, it's IPv6, so restore address
        @r = nil
        @info = {}
        @info[:host] = ip
        @info[:port] = port
        @info[:slots] = {}
        @info[:migrating] = {}
        @info[:importing] = {}
        @info[:replicate] = false
        @dirty = false # True if we need to flush slots info into node.
        @friends = []
    end

    def friends
        @friends
    end

    def slots
        @info[:slots]
    end

    def has_flag?(flag)
        @info[:flags].index(flag)
    end

    def to_s
        "#{@info[:host]}:#{@info[:port]}"
    end

    def connect(o={})
        return if @r
        print "Connecting to node #{self}: " if $verbose
        STDOUT.flush
        begin
            @r = Redis.new(:host => @info[:host], :port => @info[:port], :timeout => 60)
            @r.ping
        rescue
            xputs "[ERR] Sorry, can't connect to node #{self}"
            exit 1 if o[:abort]
            @r = nil
        end
        xputs "OK" if $verbose
    end

    def assert_cluster
        info = @r.info
        if !info["cluster_enabled"] || info["cluster_enabled"].to_i == 0
            xputs "[ERR] Node #{self} is not configured as a cluster node."
            exit 1
        end
    end

    def assert_empty
        if !(@r.cluster("info").split("\r\n").index("cluster_known_nodes:1")) ||
            (@r.info['db0'])
            xputs "[ERR] Node #{self} is not empty. Either the node already knows other nodes (check with CLUSTER NODES) or contains some key in database 0."
            exit 1
        end
    end

    def load_info(o={})
        self.connect
        nodes = @r.cluster("nodes").split("\n")
        nodes.each{|n|
            # name addr flags role ping_sent ping_recv link_status slots
            split = n.split
            name,addr,flags,master_id,ping_sent,ping_recv,config_epoch,link_status = split[0..6]
            slots = split[8..-1]
            info = {
                :name => name,
                :addr => addr,
                :flags => flags.split(","),
                :replicate => master_id,
                :ping_sent => ping_sent.to_i,
                :ping_recv => ping_recv.to_i,
                :link_status => link_status
            }
            info[:replicate] = false if master_id == "-"

            if info[:flags].index("myself")
                @info = @info.merge(info)
                @info[:slots] = {}
                slots.each{|s|
                    if s[0..0] == '['
                        if s.index("->-") # Migrating
                            slot,dst = s[1..-1].split("->-")
                            @info[:migrating][slot.to_i] = dst
                        elsif s.index("-<-") # Importing
                            slot,src = s[1..-1].split("-<-")
                            @info[:importing][slot.to_i] = src
                        end
                    elsif s.index("-")
                        start,stop = s.split("-")
                        self.add_slots((start.to_i)..(stop.to_i))
                    else
                        self.add_slots((s.to_i)..(s.to_i))
                    end
                } if slots
                @dirty = false
                @r.cluster("info").split("\n").each{|e|
                    k,v=e.split(":")
                    k = k.to_sym
                    v.chop!
                    if k != :cluster_state
                        @info[k] = v.to_i
                    else
                        @info[k] = v
                    end
                }
            elsif o[:getfriends]
                @friends << info
            end
        }
    end

    def add_slots(slots)
        slots.each{|s|
            @info[:slots][s] = :new
        }
        @dirty = true
    end

    def set_as_replica(node_id)
        @info[:replicate] = node_id
        @dirty = true
    end

    def flush_node_config
        return if !@dirty
        if @info[:replicate]
            begin
                @r.cluster("replicate",@info[:replicate])
            rescue
                # If the cluster did not already joined it is possible that
                # the slave does not know the master node yet. So on errors
                # we return ASAP leaving the dirty flag set, to flush the
                # config later.
                return
            end
        else
            new = []
            @info[:slots].each{|s,val|
                if val == :new
                    new << s
                    @info[:slots][s] = true
                end
            }
            @r.cluster("addslots",*new)
        end
        @dirty = false
    end

    def info_string
        # We want to display the hash slots assigned to this node
        # as ranges, like in: "1-5,8-9,20-25,30"
        #
        # Note: this could be easily written without side effects,
        # we use 'slots' just to split the computation into steps.

        # First step: we want an increasing array of integers
        # for instance: [1,2,3,4,5,8,9,20,21,22,23,24,25,30]
        slots = @info[:slots].keys.sort

        # As we want to aggregate adjacent slots we convert all the
        # slot integers into ranges (with just one element)
        # So we have something like [1..1,2..2, ... and so forth.
        slots.map!{|x| x..x}

        # Finally we group ranges with adjacent elements.
        slots = slots.reduce([]) {|a,b|
            if !a.empty? && b.first == (a[-1].last)+1
                a[0..-2] + [(a[-1].first)..(b.last)]
            else
                a + [b]
            end
        }

        # Now our task is easy, we just convert ranges with just one
        # element into a number, and a real range into a start-end format.
        # Finally we join the array using the comma as separator.
        slots = slots.map{|x|
            x.count == 1 ? x.first.to_s : "#{x.first}-#{x.last}"
        }.join(",")

        role = self.has_flag?("master") ? "M" : "S"

        if self.info[:replicate] and @dirty
            is = "S: #{self.info[:name]} #{self.to_s}"
        else
            is = "#{role}: #{self.info[:name]} #{self.to_s}\n"+
            "   slots:#{slots} (#{self.slots.length} slots) "+
            "#{(self.info[:flags]-["myself"]).join(",")}"
        end
        if self.info[:replicate]
            is += "\n   replicates #{info[:replicate]}"
        elsif self.has_flag?("master") && self.info[:replicas]
            is += "\n   #{info[:replicas].length} additional replica(s)"
        end
        is
    end

    # Return a single string representing nodes and associated slots.
    # TODO: remove slaves from config when slaves will be handled
    # by Redis Cluster.
    def get_config_signature
        config = []
        @r.cluster("nodes").each_line{|l|
            s = l.split
            slots = s[8..-1].select {|x| x[0..0] != "["}
            next if slots.length == 0
            config << s[0]+":"+(slots.sort.join(","))
        }
        config.sort.join("|")
    end

    def info
        @info
    end

    def is_dirty?
        @dirty
    end

    def r
        @r
    end
end

class RedisTrib
    def initialize
        @nodes = []
        @fix = false
        @errors = []
        @timeout = MigrateDefaultTimeout
    end

    def check_arity(req_args, num_args)
        if ((req_args > 0 and num_args != req_args) ||
           (req_args < 0 and num_args < req_args.abs))
           xputs "[ERR] Wrong number of arguments for specified sub command"
           exit 1
        end
    end

    def add_node(node)
        @nodes << node
    end

    def reset_nodes
        @nodes = []
    end

    def cluster_error(msg)
        @errors << msg
        xputs msg
    end

    # Return the node with the specified ID or Nil.
    def get_node_by_name(name)
        @nodes.each{|n|
            return n if n.info[:name] == name.downcase
        }
        return nil
    end

    # Like get_node_by_name but the specified name can be just the first
    # part of the node ID as long as the prefix in unique across the
    # cluster.
    def get_node_by_abbreviated_name(name)
        l = name.length
        candidates = []
        @nodes.each{|n|
            if n.info[:name][0...l] == name.downcase
                candidates << n
            end
        }
        return nil if candidates.length != 1
        candidates[0]
    end

    # This function returns the master that has the least number of replicas
    # in the cluster. If there are multiple masters with the same smaller
    # number of replicas, one at random is returned.
    def get_master_with_least_replicas
        masters = @nodes.select{|n| n.has_flag? "master"}
        sorted = masters.sort{|a,b|
            a.info[:replicas].length <=> b.info[:replicas].length
        }
        sorted[0]
    end

    def check_cluster(opt={})
        xputs ">>> Performing Cluster Check (using node #{@nodes[0]})"
        show_nodes if !opt[:quiet]
        check_config_consistency
        check_open_slots
        check_slots_coverage
    end

    def show_cluster_info
        masters = 0
        keys = 0
        @nodes.each{|n|
            if n.has_flag?("master")
                puts "#{n} (#{n.info[:name][0...8]}...) -> #{n.r.dbsize} keys | #{n.slots.length} slots | "+
                     "#{n.info[:replicas].length} slaves."
                masters += 1
                keys += n.r.dbsize
            end
        }
        xputs "[OK] #{keys} keys in #{masters} masters."
        keys_per_slot = sprintf("%.2f",keys/16384.0)
        puts "#{keys_per_slot} keys per slot on average."
    end

    # Merge slots of every known node. If the resulting slots are equal
    # to ClusterHashSlots, then all slots are served.
    def covered_slots
        slots = {}
        @nodes.each{|n|
            slots = slots.merge(n.slots)
        }
        slots
    end

    def check_slots_coverage
        xputs ">>> Check slots coverage..."
        slots = covered_slots
        if slots.length == ClusterHashSlots
            xputs "[OK] All #{ClusterHashSlots} slots covered."
        else
            cluster_error \
                "[ERR] Not all #{ClusterHashSlots} slots are covered by nodes."
            fix_slots_coverage if @fix
        end
    end

    def check_open_slots
        xputs ">>> Check for open slots..."
        open_slots = []
        @nodes.each{|n|
            if n.info[:migrating].size > 0
                cluster_error \
                    "[WARNING] Node #{n} has slots in migrating state (#{n.info[:migrating].keys.join(",")})."
                open_slots += n.info[:migrating].keys
            elsif n.info[:importing].size > 0
                cluster_error \
                    "[WARNING] Node #{n} has slots in importing state (#{n.info[:importing].keys.join(",")})."
                open_slots += n.info[:importing].keys
            end
        }
        open_slots.uniq!
        if open_slots.length > 0
            xputs "[WARNING] The following slots are open: #{open_slots.join(",")}"
        end
        if @fix
            open_slots.each{|slot| fix_open_slot slot}
        end
    end

    def nodes_with_keys_in_slot(slot)
        nodes = []
        @nodes.each{|n|
            next if n.has_flag?("slave")
            nodes << n if n.r.cluster("getkeysinslot",slot,1).length > 0
        }
        nodes
    end

    def fix_slots_coverage
        not_covered = (0...ClusterHashSlots).to_a - covered_slots.keys
        xputs ">>> Fixing slots coverage..."
        xputs "List of not covered slots: " + not_covered.join(",")

        # For every slot, take action depending on the actual condition:
        # 1) No node has keys for this slot.
        # 2) A single node has keys for this slot.
        # 3) Multiple nodes have keys for this slot.
        slots = {}
        not_covered.each{|slot|
            nodes = nodes_with_keys_in_slot(slot)
            slots[slot] = nodes
            xputs "Slot #{slot} has keys in #{nodes.length} nodes: #{nodes.join(", ")}"
        }

        none = slots.select {|k,v| v.length == 0}
        single = slots.select {|k,v| v.length == 1}
        multi = slots.select {|k,v| v.length > 1}

        # Handle case "1": keys in no node.
        if none.length > 0
            xputs "The folowing uncovered slots have no keys across the cluster:"
            xputs none.keys.join(",")
            yes_or_die "Fix these slots by covering with a random node?"
            none.each{|slot,nodes|
                node = @nodes.sample
                xputs ">>> Covering slot #{slot} with #{node}"
                node.r.cluster("addslots",slot)
            }
        end

        # Handle case "2": keys only in one node.
        if single.length > 0
            xputs "The folowing uncovered slots have keys in just one node:"
            puts single.keys.join(",")
            yes_or_die "Fix these slots by covering with those nodes?"
            single.each{|slot,nodes|
                xputs ">>> Covering slot #{slot} with #{nodes[0]}"
                nodes[0].r.cluster("addslots",slot)
            }
        end

        # Handle case "3": keys in multiple nodes.
        if multi.length > 0
            xputs "The folowing uncovered slots have keys in multiple nodes:"
            xputs multi.keys.join(",")
            yes_or_die "Fix these slots by moving keys into a single node?"
            multi.each{|slot,nodes|
                target = get_node_with_most_keys_in_slot(nodes,slot)
                xputs ">>> Covering slot #{slot} moving keys to #{target}"

                target.r.cluster('addslots',slot)
                target.r.cluster('setslot',slot,'stable')
                nodes.each{|src|
                    next if src == target
                    # Set the source node in 'importing' state (even if we will
                    # actually migrate keys away) in order to avoid receiving
                    # redirections for MIGRATE.
                    src.r.cluster('setslot',slot,'importing',target.info[:name])
                    move_slot(src,target,slot,:dots=>true,:fix=>true,:cold=>true)
                    src.r.cluster('setslot',slot,'stable')
                }
            }
        end
    end

    # Return the owner of the specified slot
    def get_slot_owners(slot)
        owners = []
        @nodes.each{|n|
            next if n.has_flag?("slave")
            n.slots.each{|s,_|
                owners << n if s == slot
            }
        }
        owners
    end

    # Return the node, among 'nodes' with the greatest number of keys
    # in the specified slot.
    def get_node_with_most_keys_in_slot(nodes,slot)
        best = nil
        best_numkeys = 0
        @nodes.each{|n|
            next if n.has_flag?("slave")
            numkeys = n.r.cluster("countkeysinslot",slot)
            if numkeys > best_numkeys || best == nil
                best = n
                best_numkeys = numkeys
            end
        }
        return best
    end

    # Slot 'slot' was found to be in importing or migrating state in one or
    # more nodes. This function fixes this condition by migrating keys where
    # it seems more sensible.
    def fix_open_slot(slot)
        puts ">>> Fixing open slot #{slot}"

        # Try to obtain the current slot owner, according to the current
        # nodes configuration.
        owners = get_slot_owners(slot)
        owner = owners[0] if owners.length == 1

        migrating = []
        importing = []
        @nodes.each{|n|
            next if n.has_flag? "slave"
            if n.info[:migrating][slot]
                migrating << n
            elsif n.info[:importing][slot]
                importing << n
            elsif n.r.cluster("countkeysinslot",slot) > 0 && n != owner
                xputs "*** Found keys about slot #{slot} in node #{n}!"
                importing << n
            end
        }
        puts "Set as migrating in: #{migrating.join(",")}"
        puts "Set as importing in: #{importing.join(",")}"

        # If there is no slot owner, set as owner the slot with the biggest
        # number of keys, among the set of migrating / importing nodes.
        if !owner
            xputs ">>> Nobody claims ownership, selecting an owner..."
            owner = get_node_with_most_keys_in_slot(@nodes,slot)

            # If we still don't have an owner, we can't fix it.
            if !owner
                xputs "[ERR] Can't select a slot owner. Impossible to fix."
                exit 1
            end

            # Use ADDSLOTS to assign the slot.
            puts "*** Configuring #{owner} as the slot owner"
            n.r.cluster("setslot",slot,"stable")
            n.r.cluster("addslot",slot)
            # Make sure this information will propagate. Not strictly needed
            # since there is no past owner, so all the other nodes will accept
            # whatever epoch this node will claim the slot with.
            n.r.cluster("bumpepoch")

            # Remove the owner from the list of migrating/importing
            # nodes.
            migrating.delete(n)
            importing.delete(n)
        end

        # If there are multiple owners of the slot, we need to fix it
        # so that a single node is the owner and all the other nodes
        # are in importing state. Later the fix can be handled by one
        # of the base cases above.
        #
        # Note that this case also covers multiple nodes having the slot
        # in migrating state, since migrating is a valid state only for
        # slot owners.
        if owners.length > 1
            owner = get_node_with_most_keys_in_slot(owners,slot)
            owners.each{|n|
                next if n == owner
                n.r.cluster('delslots',slot)
                n.r.cluster('setslot',slot,'importing',owner.info[:name])
                importing.delete(n) # Avoid duplciates
                importing << n
            }
            owner.r.cluster('bumpepoch')
        end

        # Case 1: The slot is in migrating state in one slot, and in
        #         importing state in 1 slot. That's trivial to address.
        if migrating.length == 1 && importing.length == 1
            move_slot(migrating[0],importing[0],slot,:dots=>true,:fix=>true)
        # Case 2: There are multiple nodes that claim the slot as importing,
        # they probably got keys about the slot after a restart so opened
        # the slot. In this case we just move all the keys to the owner
        # according to the configuration.
        elsif migrating.length == 0 && importing.length > 0
            xputs ">>> Moving all the #{slot} slot keys to its owner #{owner}"
            importing.each {|node|
                next if node == owner
                move_slot(node,owner,slot,:dots=>true,:fix=>true,:cold=>true)
                xputs ">>> Setting #{slot} as STABLE in #{node}"
                node.r.cluster("setslot",slot,"stable")
            }
        # Case 3: There are no slots claiming to be in importing state, but
        # there is a migrating node that actually don't have any key. We
        # can just close the slot, probably a reshard interrupted in the middle.
        elsif importing.length == 0 && migrating.length == 1 &&
              migrating[0].r.cluster("getkeysinslot",slot,10).length == 0
            migrating[0].r.cluster("setslot",slot,"stable")
        else
            xputs "[ERR] Sorry, Redis-trib can't fix this slot yet (work in progress). Slot is set as migrating in #{migrating.join(",")}, as importing in #{importing.join(",")}, owner is #{owner}"
        end
    end

    # Check if all the nodes agree about the cluster configuration
    def check_config_consistency
        if !is_config_consistent?
            cluster_error "[ERR] Nodes don't agree about configuration!"
        else
            xputs "[OK] All nodes agree about slots configuration."
        end
    end

    def is_config_consistent?
        signatures=[]
        @nodes.each{|n|
            signatures << n.get_config_signature
        }
        return signatures.uniq.length == 1
    end

    def wait_cluster_join
        print "Waiting for the cluster to join"
        while !is_config_consistent?
            print "."
            STDOUT.flush
            sleep 1
        end
        print "\n"
    end

    def alloc_slots
        nodes_count = @nodes.length
        masters_count = @nodes.length / (@replicas+1)
        masters = []

        # The first step is to split instances by IP. This is useful as
        # we'll try to allocate master nodes in different physical machines
        # (as much as possible) and to allocate slaves of a given master in
        # different physical machines as well.
        #
        # This code assumes just that if the IP is different, than it is more
        # likely that the instance is running in a different physical host
        # or at least a different virtual machine.
        ips = {}
        @nodes.each{|n|
            ips[n.info[:host]] = [] if !ips[n.info[:host]]
            ips[n.info[:host]] << n
        }

        # Select master instances
        puts "Using #{masters_count} masters:"
        interleaved = []
        stop = false
        while not stop do
            # Take one node from each IP until we run out of nodes
            # across every IP.
            ips.each do |ip,nodes|
                if nodes.empty?
                    # if this IP has no remaining nodes, check for termination
                    if interleaved.length == nodes_count
                        # stop when 'interleaved' has accumulated all nodes
                        stop = true
                        next
                    end
                else
                    # else, move one node from this IP to 'interleaved'
                    interleaved.push nodes.shift
                end
            end
        end

        masters = interleaved.slice!(0, masters_count)
        nodes_count -= masters.length

        masters.each{|m| puts m}

        # Alloc slots on masters
        slots_per_node = ClusterHashSlots.to_f / masters_count
        first = 0
        cursor = 0.0
        masters.each_with_index{|n,masternum|
            last = (cursor+slots_per_node-1).round
            if last > ClusterHashSlots || masternum == masters.length-1
                last = ClusterHashSlots-1
            end
            last = first if last < first # Min step is 1.
            n.add_slots first..last
            first = last+1
            cursor += slots_per_node
        }

        # Select N replicas for every master.
        # We try to split the replicas among all the IPs with spare nodes
        # trying to avoid the host where the master is running, if possible.
        #
        # Note we loop two times.  The first loop assigns the requested
        # number of replicas to each master.  The second loop assigns any
        # remaining instances as extra replicas to masters.  Some masters
        # may end up with more than their requested number of replicas, but
        # all nodes will be used.
        assignment_verbose = false

        [:requested,:unused].each do |assign|
            masters.each do |m|
                assigned_replicas = 0
                while assigned_replicas < @replicas
                    break if nodes_count == 0
                    if assignment_verbose
                        if assign == :requested
                            puts "Requesting total of #{@replicas} replicas " \
                                 "(#{assigned_replicas} replicas assigned " \
                                 "so far with #{nodes_count} total remaining)."
                        elsif assign == :unused
                            puts "Assigning extra instance to replication " \
                                 "role too (#{nodes_count} remaining)."
                        end
                    end

                    # Return the first node not matching our current master
                    node = interleaved.find{|n| n.info[:host] != m.info[:host]}

                    # If we found a node, use it as a best-first match.
                    # Otherwise, we didn't find a node on a different IP, so we
                    # go ahead and use a same-IP replica.
                    if node
                        slave = node
                        interleaved.delete node
                    else
                        slave = interleaved.shift
                    end
                    slave.set_as_replica(m.info[:name])
                    nodes_count -= 1
                    assigned_replicas += 1
                    puts "Adding replica #{slave} to #{m}"

                    # If we are in the "assign extra nodes" loop,
                    # we want to assign one extra replica to each
                    # master before repeating masters.
                    # This break lets us assign extra replicas to masters
                    # in a round-robin way.
                    break if assign == :unused
                end
            end
        end
    end

    def flush_nodes_config
        @nodes.each{|n|
            n.flush_node_config
        }
    end

    def show_nodes
        @nodes.each{|n|
            xputs n.info_string
        }
    end

    # Redis Cluster config epoch collision resolution code is able to eventually
    # set a different epoch to each node after a new cluster is created, but
    # it is slow compared to assign a progressive config epoch to each node
    # before joining the cluster. However we do just a best-effort try here
    # since if we fail is not a problem.
    def assign_config_epoch
        config_epoch = 1
        @nodes.each{|n|
            begin
                n.r.cluster("set-config-epoch",config_epoch)
            rescue
            end
            config_epoch += 1
        }
    end

    def join_cluster
        # We use a brute force approach to make sure the node will meet
        # each other, that is, sending CLUSTER MEET messages to all the nodes
        # about the very same node.
        # Thanks to gossip this information should propagate across all the
        # cluster in a matter of seconds.
        first = false
        @nodes.each{|n|
            if !first then first = n.info; next; end # Skip the first node
            n.r.cluster("meet",first[:host],first[:port])
        }
    end

    def yes_or_die(msg)
        print "#{msg} (type 'yes' to accept): "
        STDOUT.flush
        if !(STDIN.gets.chomp.downcase == "yes")
            xputs "*** Aborting..."
            exit 1
        end
    end

    def load_cluster_info_from_node(nodeaddr)
        node = ClusterNode.new(nodeaddr)
        node.connect(:abort => true)
        node.assert_cluster
        node.load_info(:getfriends => true)
        add_node(node)
        node.friends.each{|f|
            next if f[:flags].index("noaddr") ||
                    f[:flags].index("disconnected") ||
                    f[:flags].index("fail")
            fnode = ClusterNode.new(f[:addr])
            fnode.connect()
            next if !fnode.r
            begin
                fnode.load_info()
                add_node(fnode)
            rescue => e
                xputs "[ERR] Unable to load info for node #{fnode}"
            end
        }
        populate_nodes_replicas_info
    end

    # This function is called by load_cluster_info_from_node in order to
    # add additional information to every node as a list of replicas.
    def populate_nodes_replicas_info
        # Start adding the new field to every node.
        @nodes.each{|n|
            n.info[:replicas] = []
        }

        # Populate the replicas field using the replicate field of slave
        # nodes.
        @nodes.each{|n|
            if n.info[:replicate]
                master = get_node_by_name(n.info[:replicate])
                if !master
                    xputs "*** WARNING: #{n} claims to be slave of unknown node ID #{n.info[:replicate]}."
                else
                    master.info[:replicas] << n
                end
            end
        }
    end

    # Given a list of source nodes return a "resharding plan"
    # with what slots to move in order to move "numslots" slots to another
    # instance.
    def compute_reshard_table(sources,numslots)
        moved = []
        # Sort from bigger to smaller instance, for two reasons:
        # 1) If we take less slots than instances it is better to start
        #    getting from the biggest instances.
        # 2) We take one slot more from the first instance in the case of not
        #    perfect divisibility. Like we have 3 nodes and need to get 10
        #    slots, we take 4 from the first, and 3 from the rest. So the
        #    biggest is always the first.
        sources = sources.sort{|a,b| b.slots.length <=> a.slots.length}
        source_tot_slots = sources.inject(0) {|sum,source|
            sum+source.slots.length
        }
        sources.each_with_index{|s,i|
            # Every node will provide a number of slots proportional to the
            # slots it has assigned.
            n = (numslots.to_f/source_tot_slots*s.slots.length)
            if i == 0
                n = n.ceil
            else
                n = n.floor
            end
            s.slots.keys.sort[(0...n)].each{|slot|
                if moved.length < numslots
                    moved << {:source => s, :slot => slot}
                end
            }
        }
        return moved
    end

    def show_reshard_table(table)
        table.each{|e|
            puts "    Moving slot #{e[:slot]} from #{e[:source].info[:name]}"
        }
    end

    # Move slots between source and target nodes using MIGRATE.
    #
    # Options:
    # :verbose -- Print a dot for every moved key.
    # :fix     -- We are moving in the context of a fix. Use REPLACE.
    # :cold    -- Move keys without opening slots / reconfiguring the nodes.
    # :update  -- Update nodes.info[:slots] for source/target nodes.
    # :quiet   -- Don't print info messages.
    def move_slot(source,target,slot,o={})
        o = {:pipeline => MigrateDefaultPipeline}.merge(o)

        # We start marking the slot as importing in the destination node,
        # and the slot as migrating in the target host. Note that the order of
        # the operations is important, as otherwise a client may be redirected
        # to the target node that does not yet know it is importing this slot.
        if !o[:quiet]
            print "Moving slot #{slot} from #{source} to #{target}: "
            STDOUT.flush
        end

        if !o[:cold]
            target.r.cluster("setslot",slot,"importing",source.info[:name])
            source.r.cluster("setslot",slot,"migrating",target.info[:name])
        end
        # Migrate all the keys from source to target using the MIGRATE command
        while true
            keys = source.r.cluster("getkeysinslot",slot,o[:pipeline])
            break if keys.length == 0
            begin
                source.r.client.call(["migrate",target.info[:host],target.info[:port],"",0,@timeout,:keys,*keys])
            rescue => e
                if o[:fix] && e.to_s =~ /BUSYKEY/
                    xputs "*** Target key exists. Replacing it for FIX."
                    source.r.client.call(["migrate",target.info[:host],target.info[:port],"",0,@timeout,:replace,:keys,*keys])
                else
                    puts ""
                    xputs "[ERR] #{e}"
                    exit 1
                end
            end
            print "."*keys.length if o[:dots]
            STDOUT.flush
        end

        puts if !o[:quiet]
        # Set the new node as the owner of the slot in all the known nodes.
        if !o[:cold]
            @nodes.each{|n|
                next if n.has_flag?("slave")
                n.r.cluster("setslot",slot,"node",target.info[:name])
            }
        end

        # Update the node logical config
        if o[:update] then
            source.info[:slots].delete(slot)
            target.info[:slots][slot] = true
        end
    end

    # redis-trib subcommands implementations.

    def check_cluster_cmd(argv,opt)
        load_cluster_info_from_node(argv[0])
        check_cluster
    end

    def info_cluster_cmd(argv,opt)
        load_cluster_info_from_node(argv[0])
        show_cluster_info
    end

    def rebalance_cluster_cmd(argv,opt)
        opt = {
            'pipeline' => MigrateDefaultPipeline,
            'threshold' => RebalanceDefaultThreshold
        }.merge(opt)

        # Load nodes info before parsing options, otherwise we can't
        # handle --weight.
        load_cluster_info_from_node(argv[0])

        # Options parsing
        threshold = opt['threshold'].to_i
        autoweights = opt['auto-weights']
        weights = {}
        opt['weight'].each{|w|
            fields = w.split("=")
            node = get_node_by_abbreviated_name(fields[0])
            if !node || !node.has_flag?("master")
                puts "*** No such master node #{fields[0]}"
                exit 1
            end
            weights[node.info[:name]] = fields[1].to_f
        } if opt['weight']
        useempty = opt['use-empty-masters']

       # Assign a weight to each node, and compute the total cluster weight.
        total_weight = 0
        nodes_involved = 0
        @nodes.each{|n|
            if n.has_flag?("master")
                next if !useempty && n.slots.length == 0
                n.info[:w] = weights[n.info[:name]] ? weights[n.info[:name]] : 1
                total_weight += n.info[:w]
                nodes_involved += 1
            end
        }

        # Check cluster, only proceed if it looks sane.
        check_cluster(:quiet => true)
        if @errors.length != 0
            puts "*** Please fix your cluster problems before rebalancing"
            exit 1
        end

        # Calculate the slots balance for each node. It's the number of
        # slots the node should lose (if positive) or gain (if negative)
        # in order to be balanced.
        threshold = opt['threshold'].to_f
        threshold_reached = false
        @nodes.each{|n|
            if n.has_flag?("master")
                next if !n.info[:w]
                expected = ((ClusterHashSlots.to_f / total_weight) *
                            n.info[:w]).to_i
                n.info[:balance] = n.slots.length - expected
                # Compute the percentage of difference between the
                # expected number of slots and the real one, to see
                # if it's over the threshold specified by the user.
                over_threshold = false
                if threshold > 0
                    if n.slots.length > 0
                        err_perc = (100-(100.0*expected/n.slots.length)).abs
                        over_threshold = true if err_perc > threshold
                    elsif expected > 0
                        over_threshold = true
                    end
                end
                threshold_reached = true if over_threshold
            end
        }
        if !threshold_reached
            xputs "*** No rebalancing needed! All nodes are within the #{threshold}% threshold."
            return
        end

        # Only consider nodes we want to change
        sn = @nodes.select{|n|
            n.has_flag?("master") && n.info[:w]
        }

        # Because of rounding, it is possible that the balance of all nodes
        # summed does not give 0. Make sure that nodes that have to provide
        # slots are always matched by nodes receiving slots.
        total_balance = sn.map{|x| x.info[:balance]}.reduce{|a,b| a+b}
        while total_balance > 0
            sn.each{|n|
                if n.info[:balance] < 0 && total_balance > 0
                    n.info[:balance] -= 1
                    total_balance -= 1
                end
            }
        end

        # Sort nodes by their slots balance.
        sn = sn.sort{|a,b|
            a.info[:balance] <=> b.info[:balance]
        }

        xputs ">>> Rebalancing across #{nodes_involved} nodes. Total weight = #{total_weight}"

        if $verbose
            sn.each{|n|
                puts "#{n} balance is #{n.info[:balance]} slots"
            }
        end

        # Now we have at the start of the 'sn' array nodes that should get
        # slots, at the end nodes that must give slots.
        # We take two indexes, one at the start, and one at the end,
        # incrementing or decrementing the indexes accordingly til we
        # find nodes that need to get/provide slots.
        dst_idx = 0
        src_idx = sn.length - 1

        while dst_idx < src_idx
            dst = sn[dst_idx]
            src = sn[src_idx]
            numslots = [dst.info[:balance],src.info[:balance]].map{|n|
                n.abs
            }.min

            if numslots > 0
                puts "Moving #{numslots} slots from #{src} to #{dst}"

                # Actaully move the slots.
                reshard_table = compute_reshard_table([src],numslots)
                if reshard_table.length != numslots
                    xputs "*** Assertio failed: Reshard table != number of slots"
                    exit 1
                end
                if opt['simulate']
                    print "#"*reshard_table.length
                else
                    reshard_table.each{|e|
                        move_slot(e[:source],dst,e[:slot],
                            :quiet=>true,
                            :dots=>false,
                            :update=>true,
                            :pipeline=>opt['pipeline'])
                        print "#"
                        STDOUT.flush
                    }
                end
                puts
            end

            # Update nodes balance.
            dst.info[:balance] += numslots
            src.info[:balance] -= numslots
            dst_idx += 1 if dst.info[:balance] == 0
            src_idx -= 1 if src.info[:balance] == 0
        end
    end

    def fix_cluster_cmd(argv,opt)
        @fix = true
        @timeout = opt['timeout'].to_i if opt['timeout']

        load_cluster_info_from_node(argv[0])
        check_cluster
    end

    def reshard_cluster_cmd(argv,opt)
        opt = {'pipeline' => MigrateDefaultPipeline}.merge(opt)

        load_cluster_info_from_node(argv[0])
        check_cluster
        if @errors.length != 0
            puts "*** Please fix your cluster problems before resharding"
            exit 1
        end

        @timeout = opt['timeout'].to_i if opt['timeout'].to_i

        # Get number of slots
        if opt['slots']
            numslots = opt['slots'].to_i
        else
            numslots = 0
            while numslots <= 0 or numslots > ClusterHashSlots
                print "How many slots do you want to move (from 1 to #{ClusterHashSlots})? "
                numslots = STDIN.gets.to_i
            end
        end

        # Get the target instance
        if opt['to']
            target = get_node_by_name(opt['to'])
            if !target || target.has_flag?("slave")
                xputs "*** The specified node is not known or not a master, please retry."
                exit 1
            end
        else
            target = nil
            while not target
                print "What is the receiving node ID? "
                target = get_node_by_name(STDIN.gets.chop)
                if !target || target.has_flag?("slave")
                    xputs "*** The specified node is not known or not a master, please retry."
                    target = nil
                end
            end
        end

        # Get the source instances
        sources = []
        if opt['from']
            opt['from'].split(',').each{|node_id|
                if node_id == "all"
                    sources = "all"
                    break
                end
                src = get_node_by_name(node_id)
                if !src || src.has_flag?("slave")
                    xputs "*** The specified node is not known or is not a master, please retry."
                    exit 1
                end
                sources << src
            }
        else
            xputs "Please enter all the source node IDs."
            xputs "  Type 'all' to use all the nodes as source nodes for the hash slots."
            xputs "  Type 'done' once you entered all the source nodes IDs."
            while true
                print "Source node ##{sources.length+1}:"
                line = STDIN.gets.chop
                src = get_node_by_name(line)
                if line == "done"
                    break
                elsif line == "all"
                    sources = "all"
                    break
                elsif !src || src.has_flag?("slave")
                    xputs "*** The specified node is not known or is not a master, please retry."
                elsif src.info[:name] == target.info[:name]
                    xputs "*** It is not possible to use the target node as source node."
                else
                    sources << src
                end
            end
        end

        if sources.length == 0
            puts "*** No source nodes given, operation aborted"
            exit 1
        end

        # Handle soures == all.
        if sources == "all"
            sources = []
            @nodes.each{|n|
                next if n.info[:name] == target.info[:name]
                next if n.has_flag?("slave")
                sources << n
            }
        end

        # Check if the destination node is the same of any source nodes.
        if sources.index(target)
            xputs "*** Target node is also listed among the source nodes!"
            exit 1
        end

        puts "\nReady to move #{numslots} slots."
        puts "  Source nodes:"
        sources.each{|s| puts "    "+s.info_string}
        puts "  Destination node:"
        puts "    #{target.info_string}"
        reshard_table = compute_reshard_table(sources,numslots)
        puts "  Resharding plan:"
        show_reshard_table(reshard_table)
        if !opt['yes']
            print "Do you want to proceed with the proposed reshard plan (yes/no)? "
            yesno = STDIN.gets.chop
            exit(1) if (yesno != "yes")
        end
        reshard_table.each{|e|
            move_slot(e[:source],target,e[:slot],
                :dots=>true,
                :pipeline=>opt['pipeline'])
        }
    end

    # This is an helper function for create_cluster_cmd that verifies if
    # the number of nodes and the specified replicas have a valid configuration
    # where there are at least three master nodes and enough replicas per node.
    def check_create_parameters
        masters = @nodes.length/(@replicas+1)
        if masters < 3
            puts "*** ERROR: Invalid configuration for cluster creation."
            puts "*** Redis Cluster requires at least 3 master nodes."
            puts "*** This is not possible with #{@nodes.length} nodes and #{@replicas} replicas per node."
            puts "*** At least #{3*(@replicas+1)} nodes are required."
            exit 1
        end
    end

    def create_cluster_cmd(argv,opt)
        opt = {'replicas' => 0}.merge(opt)
        @replicas = opt['replicas'].to_i

        xputs ">>> Creating cluster"
        argv[0..-1].each{|n|
            node = ClusterNode.new(n)
            node.connect(:abort => true)
            node.assert_cluster
            node.load_info
            node.assert_empty
            add_node(node)
        }
        check_create_parameters
        xputs ">>> Performing hash slots allocation on #{@nodes.length} nodes..."
        alloc_slots
        show_nodes
        yes_or_die "Can I set the above configuration?"
        flush_nodes_config
        xputs ">>> Nodes configuration updated"
        xputs ">>> Assign a different config epoch to each node"
        assign_config_epoch
        xputs ">>> Sending CLUSTER MEET messages to join the cluster"
        join_cluster
        # Give one second for the join to start, in order to avoid that
        # wait_cluster_join will find all the nodes agree about the config as
        # they are still empty with unassigned slots.
        sleep 1
        wait_cluster_join
        flush_nodes_config # Useful for the replicas
        check_cluster
    end

    def addnode_cluster_cmd(argv,opt)
        xputs ">>> Adding node #{argv[0]} to cluster #{argv[1]}"

        # Check the existing cluster
        load_cluster_info_from_node(argv[1])
        check_cluster

        # If --master-id was specified, try to resolve it now so that we
        # abort before starting with the node configuration.
        if opt['slave']
            if opt['master-id']
                master = get_node_by_name(opt['master-id'])
                if !master
                    xputs "[ERR] No such master ID #{opt['master-id']}"
                end
            else
                master = get_master_with_least_replicas
                xputs "Automatically selected master #{master}"
            end
        end

        # Add the new node
        new = ClusterNode.new(argv[0])
        new.connect(:abort => true)
        new.assert_cluster
        new.load_info
        new.assert_empty
        first = @nodes.first.info
        add_node(new)

        # Send CLUSTER MEET command to the new node
        xputs ">>> Send CLUSTER MEET to node #{new} to make it join the cluster."
        new.r.cluster("meet",first[:host],first[:port])

        # Additional configuration is needed if the node is added as
        # a slave.
        if opt['slave']
            wait_cluster_join
            xputs ">>> Configure node as replica of #{master}."
            new.r.cluster("replicate",master.info[:name])
        end
        xputs "[OK] New node added correctly."
    end

    def delnode_cluster_cmd(argv,opt)
        id = argv[1].downcase
        xputs ">>> Removing node #{id} from cluster #{argv[0]}"

        # Load cluster information
        load_cluster_info_from_node(argv[0])

        # Check if the node exists and is not empty
        node = get_node_by_name(id)

        if !node
            xputs "[ERR] No such node ID #{id}"
            exit 1
        end

        if node.slots.length != 0
            xputs "[ERR] Node #{node} is not empty! Reshard data away and try again."
            exit 1
        end

        # Send CLUSTER FORGET to all the nodes but the node to remove
        xputs ">>> Sending CLUSTER FORGET messages to the cluster..."
        @nodes.each{|n|
            next if n == node
            if n.info[:replicate] && n.info[:replicate].downcase == id
                # Reconfigure the slave to replicate with some other node
                master = get_master_with_least_replicas
                xputs ">>> #{n} as replica of #{master}"
                n.r.cluster("replicate",master.info[:name])
            end
            n.r.cluster("forget",argv[1])
        }

        # Finally shutdown the node
        xputs ">>> SHUTDOWN the node."
        node.r.shutdown
    end

    def set_timeout_cluster_cmd(argv,opt)
        timeout = argv[1].to_i
        if timeout < 100
            puts "Setting a node timeout of less than 100 milliseconds is a bad idea."
            exit 1
        end

        # Load cluster information
        load_cluster_info_from_node(argv[0])
        ok_count = 0
        err_count = 0

        # Send CLUSTER FORGET to all the nodes but the node to remove
        xputs ">>> Reconfiguring node timeout in every cluster node..."
        @nodes.each{|n|
            begin
                n.r.config("set","cluster-node-timeout",timeout)
                n.r.config("rewrite")
                ok_count += 1
                xputs "*** New timeout set for #{n}"
            rescue => e
                puts "ERR setting node-timeot for #{n}: #{e}"
                err_count += 1
            end
        }
        xputs ">>> New node timeout set. #{ok_count} OK, #{err_count} ERR."
    end

    def call_cluster_cmd(argv,opt)
        cmd = argv[1..-1]
        cmd[0] = cmd[0].upcase

        # Load cluster information
        load_cluster_info_from_node(argv[0])
        xputs ">>> Calling #{cmd.join(" ")}"
        @nodes.each{|n|
            begin
                res = n.r.send(*cmd)
                puts "#{n}: #{res}"
            rescue => e
                puts "#{n}: #{e}"
            end
        }
    end

    def import_cluster_cmd(argv,opt)
        source_addr = opt['from']
        xputs ">>> Importing data from #{source_addr} to cluster #{argv[1]}"
        use_copy = opt['copy']
        use_replace = opt['replace']
        
        # Check the existing cluster.
        load_cluster_info_from_node(argv[0])
        check_cluster

        # Connect to the source node.
        xputs ">>> Connecting to the source Redis instance"
        src_host,src_port = source_addr.split(":")
        source = Redis.new(:host =>src_host, :port =>src_port)
        if source.info['cluster_enabled'].to_i == 1
            xputs "[ERR] The source node should not be a cluster node."
        end
        xputs "*** Importing #{source.dbsize} keys from DB 0"

        # Build a slot -> node map
        slots = {}
        @nodes.each{|n|
            n.slots.each{|s,_|
                slots[s] = n
            }
        }

        # Use SCAN to iterate over the keys, migrating to the
        # right node as needed.
        cursor = nil
        while cursor != 0
            cursor,keys = source.scan(cursor, :count => 1000)
            cursor = cursor.to_i
            keys.each{|k|
                # Migrate keys using the MIGRATE command.
                slot = key_to_slot(k)
                target = slots[slot]
                print "Migrating #{k} to #{target}: "
                STDOUT.flush
                begin
                    cmd = ["migrate",target.info[:host],target.info[:port],k,0,@timeout]
                    cmd << :copy if use_copy
                    cmd << :replace if use_replace
                    source.client.call(cmd)
                rescue => e
                    puts e
                else
                    puts "OK"
                end
            }
        end
    end

    def help_cluster_cmd(argv,opt)
        show_help
        exit 0
    end

    # Parse the options for the specific command "cmd".
    # Returns an hash populate with option => value pairs, and the index of
    # the first non-option argument in ARGV.
    def parse_options(cmd)
        idx = 1 ; # Current index into ARGV
        options={}
        while idx < ARGV.length && ARGV[idx][0..1] == '--'
            if ARGV[idx][0..1] == "--"
                option = ARGV[idx][2..-1]
                idx += 1

                # --verbose is a global option
                if option == "verbose"
                    $verbose = true
                    next
                end

                if ALLOWED_OPTIONS[cmd] == nil || ALLOWED_OPTIONS[cmd][option] == nil
                    puts "Unknown option '#{option}' for command '#{cmd}'"
                    exit 1
                end
                if ALLOWED_OPTIONS[cmd][option] != false
                    value = ARGV[idx]
                    idx += 1
                else
                    value = true
                end

                # If the option is set to [], it's a multiple arguments
                # option. We just queue every new value into an array.
                if ALLOWED_OPTIONS[cmd][option] == []
                    options[option] = [] if !options[option]
                    options[option] << value
                else
                    options[option] = value
                end
            else
                # Remaining arguments are not options.
                break
            end
        end

        # Enforce mandatory options
        if ALLOWED_OPTIONS[cmd]
            ALLOWED_OPTIONS[cmd].each {|option,val|
                if !options[option] && val == :required
                    puts "Option '--#{option}' is required "+ \
                         "for subcommand '#{cmd}'"
                    exit 1
                end
            }
        end
        return options,idx
    end
end

#################################################################################
# Libraries
#
# We try to don't depend on external libs since this is a critical part
# of Redis Cluster.
#################################################################################

# This is the CRC16 algorithm used by Redis Cluster to hash keys.
# Implementation according to CCITT standards.
#
# This is actually the XMODEM CRC 16 algorithm, using the
# following parameters:
#
# Name                       : "XMODEM", also known as "ZMODEM", "CRC-16/ACORN"
# Width                      : 16 bit
# Poly                       : 1021 (That is actually x^16 + x^12 + x^5 + 1)
# Initialization             : 0000
# Reflect Input byte         : False
# Reflect Output CRC         : False
# Xor constant to output CRC : 0000
# Output for "123456789"     : 31C3

module RedisClusterCRC16
    def RedisClusterCRC16.crc16(bytes)
        crc = 0
        bytes.each_byte{|b|
            crc = ((crc<<8) & 0xffff) ^ XMODEMCRC16Lookup[((crc>>8)^b) & 0xff]
        }
        crc
    end

private
    XMODEMCRC16Lookup = [
        0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
        0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
        0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,
        0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de,
        0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485,
        0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d,
        0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4,
        0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc,
        0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823,
        0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b,
        0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12,
        0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a,
        0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41,
        0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49,
        0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70,
        0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78,
        0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f,
        0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067,
        0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e,
        0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256,
        0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d,
        0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405,
        0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c,
        0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634,
        0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab,
        0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3,
        0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a,
        0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92,
        0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9,
        0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1,
        0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8,
        0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0
    ]
end

# Turn a key name into the corrisponding Redis Cluster slot.
def key_to_slot(key)
    # Only hash what is inside {...} if there is such a pattern in the key.
    # Note that the specification requires the content that is between
    # the first { and the first } after the first {. If we found {} without
    # nothing in the middle, the whole key is hashed as usually.
    s = key.index "{"
    if s
        e = key.index "}",s+1
        if e && e != s+1
            key = key[s+1..e-1]
        end
    end
    RedisClusterCRC16.crc16(key) % 16384
end

#################################################################################
# Definition of commands
#################################################################################

COMMANDS={
    "create"  => ["create_cluster_cmd", -2, "host1:port1 ... hostN:portN"],
    "check"   => ["check_cluster_cmd", 2, "host:port"],
    "info"    => ["info_cluster_cmd", 2, "host:port"],
    "fix"     => ["fix_cluster_cmd", 2, "host:port"],
    "reshard" => ["reshard_cluster_cmd", 2, "host:port"],
    "rebalance" => ["rebalance_cluster_cmd", -2, "host:port"],
    "add-node" => ["addnode_cluster_cmd", 3, "new_host:new_port existing_host:existing_port"],
    "del-node" => ["delnode_cluster_cmd", 3, "host:port node_id"],
    "set-timeout" => ["set_timeout_cluster_cmd", 3, "host:port milliseconds"],
    "call" =>    ["call_cluster_cmd", -3, "host:port command arg arg .. arg"],
    "import" =>  ["import_cluster_cmd", 2, "host:port"],
    "help"    => ["help_cluster_cmd", 1, "(show this help)"]
}

ALLOWED_OPTIONS={
    "create" => {"replicas" => true},
    "add-node" => {"slave" => false, "master-id" => true},
    "import" => {"from" => :required, "copy" => false, "replace" => false},
    "reshard" => {"from" => true, "to" => true, "slots" => true, "yes" => false, "timeout" => true, "pipeline" => true},
    "rebalance" => {"weight" => [], "auto-weights" => false, "use-empty-masters" => false, "timeout" => true, "simulate" => false, "pipeline" => true, "threshold" => true},
    "fix" => {"timeout" => MigrateDefaultTimeout},
}

def show_help
    puts "Usage: redis-trib <command> <options> <arguments ...>\n\n"
    COMMANDS.each{|k,v|
        o = ""
        puts "  #{k.ljust(15)} #{v[2]}"
        if ALLOWED_OPTIONS[k]
            ALLOWED_OPTIONS[k].each{|optname,has_arg|
                puts "                  --#{optname}" + (has_arg ? " <arg>" : "")
            }
        end
    }
    puts "\nFor check, fix, reshard, del-node, set-timeout you can specify the host and port of any working node in the cluster.\n"
end

# Sanity check
if ARGV.length == 0
    show_help
    exit 1
end

rt = RedisTrib.new
cmd_spec = COMMANDS[ARGV[0].downcase]
if !cmd_spec
    puts "Unknown redis-trib subcommand '#{ARGV[0]}'"
    exit 1
end

# Parse options
cmd_options,first_non_option = rt.parse_options(ARGV[0].downcase)
rt.check_arity(cmd_spec[1],ARGV.length-(first_non_option-1))

# Dispatch
rt.send(cmd_spec[0],ARGV[first_non_option..-1],cmd_options)