<?xml version="1.0"?>
<feed xmlns="http://www.w3.org/2005/Atom" xml:lang="de">
	<id>https://wiki.ixheim.de/index.php?action=history&amp;feed=atom&amp;title=Vici_python_example</id>
	<title>Vici python example - Versionsgeschichte</title>
	<link rel="self" type="application/atom+xml" href="https://wiki.ixheim.de/index.php?action=history&amp;feed=atom&amp;title=Vici_python_example"/>
	<link rel="alternate" type="text/html" href="https://wiki.ixheim.de/index.php?title=Vici_python_example&amp;action=history"/>
	<updated>2026-06-29T01:05:30Z</updated>
	<subtitle>Versionsgeschichte dieser Seite in Xinux Wiki</subtitle>
	<generator>MediaWiki 1.35.1</generator>
	<entry>
		<id>https://wiki.ixheim.de/index.php?title=Vici_python_example&amp;diff=16139&amp;oldid=prev</id>
		<title>Thomas: Die Seite wurde neu angelegt: „ &lt;pre&gt; # This python script is *not* required to setup and run a tunnel, # rather it shows how an external python script can bring a tunnel up / down and monit…“</title>
		<link rel="alternate" type="text/html" href="https://wiki.ixheim.de/index.php?title=Vici_python_example&amp;diff=16139&amp;oldid=prev"/>
		<updated>2017-12-18T17:24:34Z</updated>

		<summary type="html">&lt;p&gt;Die Seite wurde neu angelegt: „ &amp;lt;pre&amp;gt; # This python script is *not* required to setup and run a tunnel, # rather it shows how an external python script can bring a tunnel up / down and monit…“&lt;/p&gt;
&lt;p&gt;&lt;b&gt;Neue Seite&lt;/b&gt;&lt;/p&gt;&lt;div&gt;&lt;br /&gt;
&amp;lt;pre&amp;gt;&lt;br /&gt;
# This python script is *not* required to setup and run a tunnel,&lt;br /&gt;
# rather it shows how an external python script can bring a tunnel up / down and monitor its status.&lt;br /&gt;
&lt;br /&gt;
import vici&lt;br /&gt;
import multiprocessing&lt;br /&gt;
import collections&lt;br /&gt;
import time&lt;br /&gt;
# NOTE: unless you are root you will need to do the following: sudo chmod 777 /var/run/charon.vici&lt;br /&gt;
&lt;br /&gt;
# Edit target_connections in the VState to include the VPN connections you would like to keep alive&lt;br /&gt;
# if this connection is dropped for some reason it will be re-started automatically by the python script&lt;br /&gt;
&lt;br /&gt;
class VState(object):&lt;br /&gt;
    &amp;quot;&amp;quot;&amp;quot;holds the VPN state&amp;quot;&amp;quot;&amp;quot;&lt;br /&gt;
    def __init__(self):&lt;br /&gt;
        self.alive = True&lt;br /&gt;
        self.session = vici.Session()&lt;br /&gt;
        self.possible_connections = []&lt;br /&gt;
        self.target_connections = ['rw-2']&lt;br /&gt;
        self.active_connections = []&lt;br /&gt;
    &lt;br /&gt;
class StrongSwan(object):&lt;br /&gt;
    def __init__(self,  queue = None):&lt;br /&gt;
        self.state = VState()&lt;br /&gt;
        self.get_possible_connections()&lt;br /&gt;
    &lt;br /&gt;
    def process_control_connection_in(self):&lt;br /&gt;
        '''handle incoming mavlink packets'''&lt;br /&gt;
        pass&lt;br /&gt;
    &lt;br /&gt;
    def check_interfaces(self):&lt;br /&gt;
        state = self.state&lt;br /&gt;
        for vpn_conn in state.session.list_sas():&lt;br /&gt;
            for key in state.active_connections:&lt;br /&gt;
		try:&lt;br /&gt;
                   print 'key', key&lt;br /&gt;
                   print vpn_conn[key]&lt;br /&gt;
                   print vpn_conn[key]['established']&lt;br /&gt;
                   print vpn_conn[key]['state']&lt;br /&gt;
                   print vpn_conn[key]['local-host']&lt;br /&gt;
                   print vpn_conn[key]['remote-host']&lt;br /&gt;
		except:&lt;br /&gt;
		   pass&lt;br /&gt;
                &lt;br /&gt;
                try:&lt;br /&gt;
                    child = vpn_conn[key]['child-sas']&lt;br /&gt;
                    if child == {}:&lt;br /&gt;
                        child = None&lt;br /&gt;
                except:&lt;br /&gt;
                    print 'tunnel not connected at child level!'&lt;br /&gt;
                    child = None&lt;br /&gt;
                &lt;br /&gt;
                if child is not None:&lt;br /&gt;
                    for child_key in child:&lt;br /&gt;
                        &lt;br /&gt;
                        print 'time: ', time.time(), 'child key', child_key, child[child_key]['bytes-in'], child[child_key]['bytes-out']&lt;br /&gt;
                     &lt;br /&gt;
                        #print 'packets'&lt;br /&gt;
                        #print 'in: ', child[child_key]['packets-in']&lt;br /&gt;
                        #print 'out: ', child[child_key]['packets-out']&lt;br /&gt;
                         &lt;br /&gt;
                        #print 'bytes'&lt;br /&gt;
                        #print 'in: ', child[child_key]['bytes-in']&lt;br /&gt;
                        #print 'out: ', child[child_key]['bytes-out']&lt;br /&gt;
                     &lt;br /&gt;
                        #print child[child_key]['mode']&lt;br /&gt;
                        #print 'ip: ', child[child_key]['local-ts']&lt;br /&gt;
                        #print child[child_key]['remote-ts']&lt;br /&gt;
                        #print 'key: ', child[child_key]['rekey-time']&lt;br /&gt;
                        #print 'life: ', child[child_key]['life-time']&lt;br /&gt;
                    &lt;br /&gt;
                &lt;br /&gt;
                if key in state.target_connections and child is None:&lt;br /&gt;
                    self.connection_down(key)&lt;br /&gt;
                    self.connection_up(key)&lt;br /&gt;
        &lt;br /&gt;
        for key in state.target_connections:&lt;br /&gt;
            if key not in state.active_connections:&lt;br /&gt;
                #the connection is inactive&lt;br /&gt;
                self.connection_up(key)&lt;br /&gt;
                &lt;br /&gt;
        &lt;br /&gt;
    def connection_up(self, key):&lt;br /&gt;
        state = self.state&lt;br /&gt;
        print 'up: ', key&lt;br /&gt;
	sa = collections.OrderedDict()&lt;br /&gt;
	sa['child'] = key&lt;br /&gt;
	sa['timeout'] = '2000'&lt;br /&gt;
	sa['loglevel'] = '0'&lt;br /&gt;
	rep =state.session.initiate(sa)&lt;br /&gt;
	rep.next()&lt;br /&gt;
	rep.close()&lt;br /&gt;
	&lt;br /&gt;
        #TODO: handle errors, log?&lt;br /&gt;
        &lt;br /&gt;
    def connection_down(self, key):&lt;br /&gt;
        state = self.state&lt;br /&gt;
        print 'down: ', key&lt;br /&gt;
	sa = collections.OrderedDict()&lt;br /&gt;
	sa['ike'] = key&lt;br /&gt;
	sa['timeout'] = '2000'&lt;br /&gt;
	sa['loglevel'] = '0'&lt;br /&gt;
	rep =state.session.terminate(sa)&lt;br /&gt;
	rep.next()&lt;br /&gt;
	rep.close()&lt;br /&gt;
    &lt;br /&gt;
	#TODO: handle errors, log?&lt;br /&gt;
&lt;br /&gt;
    def get_possible_connections(self):&lt;br /&gt;
        '''reset and repopulate possible connections based on /etc/ipsec.conf'''&lt;br /&gt;
        state = self.state&lt;br /&gt;
        state.possible_connections = []&lt;br /&gt;
        for conn in state.session.list_conns():&lt;br /&gt;
            for key in conn:&lt;br /&gt;
                state.possible_connections.append(key)&lt;br /&gt;
        &lt;br /&gt;
        print 'p',state.possible_connections&lt;br /&gt;
                &lt;br /&gt;
    def get_active_connections(self):&lt;br /&gt;
        state = self.state&lt;br /&gt;
        state.active_connections = []&lt;br /&gt;
        &lt;br /&gt;
        for conn in state.session.list_sas():&lt;br /&gt;
            for key in conn:&lt;br /&gt;
                state.active_connections.append(key)&lt;br /&gt;
        &lt;br /&gt;
        print 'a', state.active_connections&lt;br /&gt;
                &lt;br /&gt;
    def is_alive(self):&lt;br /&gt;
        return self.state.alive&lt;br /&gt;
&lt;br /&gt;
def main_loop():&lt;br /&gt;
    '''main processing loop'''&lt;br /&gt;
    #make a strongSwan control object&lt;br /&gt;
    VPN = StrongSwan()&lt;br /&gt;
    while VPN.is_alive():&lt;br /&gt;
        VPN.process_control_connection_in()&lt;br /&gt;
        VPN.get_possible_connections()&lt;br /&gt;
        VPN.get_active_connections()&lt;br /&gt;
        VPN.check_interfaces()&lt;br /&gt;
        time.sleep(1.0)&lt;br /&gt;
    &lt;br /&gt;
&lt;br /&gt;
&lt;br /&gt;
if __name__ == '__main__':&lt;br /&gt;
    #run main loop as a process&lt;br /&gt;
    main = multiprocessing.Process(target=main_loop)&lt;br /&gt;
    main.start()&lt;br /&gt;
    main.join() &lt;br /&gt;
&amp;lt;/pre&amp;gt;&lt;/div&gt;</summary>
		<author><name>Thomas</name></author>
	</entry>
</feed>