Słomkowski's technical musings

Playing with software, hardware and touching the sky with a paraglider.

Server resetter from Waveshare Ethernet Relay module


Just something quick here. I had a need to create an emergency reset for servers in a rack cabinet. The same rack I made a long-lasting UPS setup for. Needing it done promptly, I simply bought a Waveshare POE ETH Relay adapter. It is an 8-channel relay module one can control via Modbus TCP. The relays’ outputs were connected to the RESET headers on the motherboards, in parallel with the main reset buttons.

Cabling

I bought a box of so-called DuPont or Berg connectors. I didn’t have a crimping tool, so I simply soldered them. In my setup, I needed four cables, one for each RESET header. The relays are SPDT; connect the cables to normally open pins. There are four relays left unused; I might connect them to the POWER header if the need arises.

Python program

The relay module was configured in accordance with its documentation. In my setup, it works in a separate management VLAN. Warning! This module doesn’t have any authentication on Modbus TCP.

The sample code provided by Waveshare doesn’t work correctly. For some reason, the module requires a CRC16 checksum to be sent with each command, despite Modbus over TCP not requiring it.

I created a simple Python script to reset the desired machine:

import argparse
import socket
import sys
from time import sleep

host = "192.168.1.10"  # change this to your relay controller's IP address
port = 4196

# map server identifier to channel
machine_to_relay_channel = {
    'your-server-1': 1,
    'your-server2': 2,
    'test': 8,
}

COM_ON = 0xFF
COM_OFF = 0


def calculate_crc(data):
    crc = 0xFFFF
    for byte in data:
        crc ^= byte
        for _ in range(8):
            if crc & 0x0001:
                crc = (crc >> 1) ^ 0xA001
            else:
                crc >>= 1
    return crc


def send_cmd(s: socket.socket, n, command):
    cmd = [0] * 8
    cmd[0] = 0x01  # Device address
    cmd[1] = 0x05  # Modbus command
    cmd[2] = 0
    cmd[3] = n - 1
    cmd[4] = command
    cmd[5] = 0

    crc = calculate_crc(cmd[:6])
    cmd[6] = crc & 0xFF
    cmd[7] = (crc >> 8) & 0xFF

    s.sendall(bytearray(cmd))
    response = s.recv(1024)
    return response


def reset_machine(machine_name: str):
    """Reset a machine by toggling its relay channel"""
    if machine_name not in machine_to_relay_channel:
        print(f"Error: Unknown machine '{machine_name}'")
        print(f"Available machines: {', '.join(machine_to_relay_channel.keys())}")
        return False

    channel = machine_to_relay_channel[machine_name]

    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.connect((host, port))
            print(f"Resetting machine '{machine_name}' (channel {channel})...")

            send_cmd(s, channel, COM_ON)
            sleep(1)
            send_cmd(s, channel, COM_OFF)

            print(f"Machine '{machine_name}' has been reset successfully.")
            return True

    except socket.error as e:
        print(f"Error connecting to relay controller: {e}")
        return False
    except Exception as e:
        print(f"Error during reset operation: {e}")
        return False


def main():
    example_reset_calls = "".join([f"  ./resetter.py reset {n}\n" for n in machine_to_relay_channel.keys()][:2])

    parser = argparse.ArgumentParser(
        description='Machine resetter - Controls relay channels to reset machines',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=f'Examples:\n{example_reset_calls}  ./resetter.py list'
    )

    subparsers = parser.add_subparsers(dest='command', help='Available commands')

    reset_parser = subparsers.add_parser('reset', help='Reset a machine')
    reset_parser.add_argument('machine_name',
                              help='Name of the machine to reset')

    subparsers.add_parser('list', help='List available machines')

    args = parser.parse_args()

    if args.command == 'reset':
        success = reset_machine(args.machine_name)
        sys.exit(0 if success else 1)

    elif args.command == 'list':
        print("Available machines:")
        for machine, channel in machine_to_relay_channel.items():
            print(f"  {machine:<10} (channel {channel})")
    else:
        parser.print_help()
        sys.exit(1)


if __name__ == "__main__":
    main()