Source code for reemote.operations.builtin.iptables

# Copyright (c) 2025 Kim Jarvis TPF Software Services S.A. kim.jarvis@tpfsystems.com 
# This software is licensed under the MIT License. See the LICENSE file for details.
#
from reemote.command import Command


[docs] class Iptables: """ A class to encapsulate the functionality of iptables rules management in Linux systems. This class allows users to add, remove, and manage iptables rules with various parameters similar to the ansible.builtin.iptables module. Attributes: action (str): Whether the rule should be appended or inserted. chain (str): Specify the iptables chain to modify. chain_management (bool): If true, create/delete chains as needed. comment (str): Comment to add to the rule. ctstate (list): Connection states to match. destination (str): Destination specification. destination_port (str): Destination port or port range. destination_ports (list): Multiple destination ports. dst_range (str): Destination IP range. flush (bool): Flush all rules from chain/table. fragment (str): Fragment matching specification. gateway (str): Gateway for TEE jump target. gid_owner (str): Group ID for owner matching. goto (str): Continue processing in specified chain. icmp_type (str): ICMP type specification. in_interface (str): Input interface specification. ip_version (str): IP protocol version. jump (str): Target of the rule. limit (str): Rate limiting specification. limit_burst (str): Burst limit specification. log_level (str): Logging level for LOG jump. log_prefix (str): Log prefix for LOG jump. match (list): Extension modules for matching. match_set (str): IP set name for matching. match_set_flags (str): Flags for match_set parameter. numeric (bool): Skip DNS lookup in list operations. out_interface (str): Output interface specification. policy (str): Chain policy setting. protocol (str): Protocol specification. reject_with (str): Error packet type for REJECT. rule_num (str): Rule number for insertion. set_counters (str): Initialize packet/byte counters. set_dscp_mark (str): DSCP mark value. set_dscp_mark_class (str): Predefined DiffServ class. source (str): Source specification. source_port (str): Source port or port range. src_range (str): Source IP range. state (str): Whether rule should be present or absent. syn (str): SYN flag matching. table (str): Packet matching table. tcp_flags (dict): TCP flags specification. to_destination (str): Destination for DNAT. to_ports (str): Port redirection specification. to_source (str): Source for SNAT. uid_owner (str): User ID for owner matching. wait (str): Wait time for xtables lock. guard (bool): If False, commands will not be executed. sudo (bool): If True, execute with sudo privileges. **Examples:** .. code:: python # Block specific IP r = yield Iptables(chain="INPUT", source="8.8.8.8", jump="DROP", sudo=True) # Forward port 80 to 8600 r = yield Iptables( table="nat", chain="PREROUTING", in_interface="eth0", protocol="tcp", match=["tcp"], destination_port="80", jump="REDIRECT", to_ports="8600", comment="Redirect web traffic to port 8600", sudo=True ) # Allow related and established connections r = yield Iptables( chain="INPUT", ctstate=["ESTABLISHED", "RELATED"], jump="ACCEPT", sudo=True ) Usage: This class is designed to be used in a generator-based workflow where commands are yielded for execution. Notes: - Commands are constructed based on the provided parameters - Supports both IPv4 and IPv6 through ip_version parameter - All parameters are optional except those required for specific operations """ def __init__(self, action: str = "append", chain: str = None, chain_management: bool = False, comment: str = None, ctstate: list = None, destination: str = None, destination_port: str = None, destination_ports: list = None, dst_range: str = None, flush: bool = False, fragment: str = None, gateway: str = None, gid_owner: str = None, goto: str = None, icmp_type: str = None, in_interface: str = None, ip_version: str = "ipv4", jump: str = None, limit: str = None, limit_burst: str = None, log_level: str = None, log_prefix: str = None, match: list = None, match_set: str = None, match_set_flags: str = None, numeric: bool = False, out_interface: str = None, policy: str = None, protocol: str = None, reject_with: str = None, rule_num: str = None, set_counters: str = None, set_dscp_mark: str = None, set_dscp_mark_class: str = None, source: str = None, source_port: str = None, src_range: str = None, state: str = "present", syn: str = "ignore", table: str = "filter", tcp_flags: dict = None, to_destination: str = None, to_ports: str = None, to_source: str = None, uid_owner: str = None, wait: str = None, guard: bool = True, sudo: bool = False): self.action = action self.chain = chain self.chain_management = chain_management self.comment = comment self.ctstate = ctstate or [] self.destination = destination self.destination_port = destination_port self.destination_ports = destination_ports or [] self.dst_range = dst_range self.flush = flush self.fragment = fragment self.gateway = gateway self.gid_owner = gid_owner self.goto = goto self.icmp_type = icmp_type self.in_interface = in_interface self.ip_version = ip_version self.jump = jump self.limit = limit self.limit_burst = limit_burst self.log_level = log_level self.log_prefix = log_prefix self.match = match or [] self.match_set = match_set self.match_set_flags = match_set_flags self.numeric = numeric self.out_interface = out_interface self.policy = policy self.protocol = protocol self.reject_with = reject_with self.rule_num = rule_num self.set_counters = set_counters self.set_dscp_mark = set_dscp_mark self.set_dscp_mark_class = set_dscp_mark_class self.source = source self.source_port = source_port self.src_range = src_range self.state = state self.syn = syn self.table = table self.tcp_flags = tcp_flags self.to_destination = to_destination self.to_ports = to_ports self.to_source = to_source self.uid_owner = uid_owner self.wait = wait self.guard = guard self.sudo = sudo def __repr__(self): params = [] for key, value in self.__dict__.items(): if value is not None and key not in ['guard', 'sudo']: params.append(f"{key}={value!r}") return f"Iptables({', '.join(params)}, guard={self.guard!r}, sudo={self.sudo!r})" def _build_command(self): """Build the iptables command based on the provided parameters.""" if self.ip_version == "ipv6": cmd_base = "ip6tables" else: cmd_base = "iptables" cmd_parts = [cmd_base] # Add wait parameter if specified if self.wait: cmd_parts.extend(["-w", self.wait]) # Handle flush operation if self.flush: cmd_parts.append("--flush") if self.chain: cmd_parts.append(self.chain) if self.table != "filter": cmd_parts.extend(["-t", self.table]) return " ".join(cmd_parts) # Handle policy setting if self.policy and self.chain: cmd_parts.extend(["-t", self.table, "-P", self.chain, self.policy]) return " ".join(cmd_parts) # Handle chain management if self.chain_management: if self.state == "present": cmd_parts.extend(["-t", self.table, "-N", self.chain]) elif self.state == "absent" and not any([getattr(self, attr) for attr in ['action', 'comment', 'ctstate', 'destination', 'destination_port', 'destination_ports', 'dst_range', 'fragment', 'gateway', 'gid_owner', 'goto', 'icmp_type', 'in_interface', 'jump', 'limit', 'limit_burst', 'log_level', 'log_prefix', 'match', 'match_set', 'match_set_flags', 'out_interface', 'protocol', 'reject_with', 'rule_num', 'set_counters', 'set_dscp_mark', 'set_dscp_mark_class', 'source', 'source_port', 'src_range', 'syn', 'tcp_flags', 'to_destination', 'to_ports', 'to_source', 'uid_owner']]): cmd_parts.extend(["-t", self.table, "-X", self.chain]) return " ".join(cmd_parts) # Build rule command if self.table != "filter": cmd_parts.extend(["-t", self.table]) if self.chain: cmd_parts.extend(["-A" if self.action == "append" else "-I", self.chain]) if self.action == "insert" and self.rule_num: cmd_parts.append(self.rule_num) # Add rule parameters if self.in_interface: cmd_parts.extend(["-i", self.in_interface]) if self.out_interface: cmd_parts.extend(["-o", self.out_interface]) if self.protocol: cmd_parts.extend(["-p", self.protocol]) if self.source: cmd_parts.extend(["-s", self.source]) if self.destination: cmd_parts.extend(["-d", self.destination]) if self.src_range: cmd_parts.extend(["-m", "iprange", "--src-range", self.src_range]) if self.dst_range: cmd_parts.extend(["-m", "iprange", "--dst-range", self.dst_range]) if self.match_set and self.match_set_flags: cmd_parts.extend(["-m", "set", "--match-set", self.match_set, self.match_set_flags]) if self.match: for m in self.match: cmd_parts.extend(["-m", m]) if self.ctstate: cmd_parts.extend(["-m", "conntrack", "--ctstate", ",".join(self.ctstate)]) if self.source_port: cmd_parts.extend(["--sport", self.source_port]) if self.destination_port: cmd_parts.extend(["--dport", self.destination_port]) if self.destination_ports: cmd_parts.extend(["-m", "multiport", "--dports", ",".join(self.destination_ports)]) if self.syn == "match": cmd_parts.append("--syn") elif self.syn == "negate": cmd_parts.append("! --syn") if self.tcp_flags: flags = ",".join(self.tcp_flags.get("flags", [])) flags_set = ",".join(self.tcp_flags.get("flags_set", [])) cmd_parts.extend(["--tcp-flags", flags, flags_set]) if self.fragment: cmd_parts.extend(["-f", self.fragment]) if self.uid_owner: cmd_parts.extend(["-m", "owner", "--uid-owner", self.uid_owner]) if self.gid_owner: cmd_parts.extend(["-m", "owner", "--gid-owner", self.gid_owner]) if self.limit: cmd_parts.extend(["-m", "limit", "--limit", self.limit]) if self.limit_burst: cmd_parts.extend(["--limit-burst", self.limit_burst]) if self.jump: cmd_parts.extend(["-j", self.jump]) elif self.goto: cmd_parts.extend(["-g", self.goto]) if self.to_ports: cmd_parts.extend(["--to-ports", self.to_ports]) if self.to_destination: cmd_parts.extend(["--to-destination", self.to_destination]) if self.to_source: cmd_parts.extend(["--to-source", self.to_source]) if self.reject_with: cmd_parts.extend(["--reject-with", self.reject_with]) if self.gateway: cmd_parts.extend(["--gateway", self.gateway]) if self.set_dscp_mark: cmd_parts.extend(["--set-dscp", self.set_dscp_mark]) elif self.set_dscp_mark_class: cmd_parts.extend(["--set-dscp-class", self.set_dscp_mark_class]) if self.log_prefix: cmd_parts.extend(["--log-prefix", f'"{self.log_prefix}"']) if self.log_level: cmd_parts.extend(["--log-level", self.log_level]) if self.comment: cmd_parts.extend(["-m", "comment", "--comment", f'"{self.comment}"']) if self.set_counters: cmd_parts.extend(["-c", self.set_counters]) return " ".join(cmd_parts) def execute(self): """Execute the iptables command.""" cmd = self._build_command() r = yield Command(cmd, guard=self.guard, sudo=self.sudo) r.changed = True