Python

IWLA 0.5

Thursday, 16 April 2020
|
Écrit par
Grégory Soutadé

Capture d'écran IWLA

3 ans se sont écoulés depuis la dernière version d'IWLA. C'est avant tout une histoire de flemme avec des commits restés trop longtemps en test (notamment la série d'août 2019), mais cela prouve aussi que le logiciel est relativement stable et répond au besoin. Cette nouvelle mouture apporte :

  • L'ajout d'un mode test (dry run)
  • De nouvelles règles pour la détection des robots (plus de 10 erreurs 404, affichage d'une page sans élément (hit), pas de page et pas de hit)
  • Un nouveau format de base de données qui réduit considérablement la taille à stocker : les statistiques de pages (vues et non vues), de hits (vus et non vus) ainsi que la bande passante sont désormais cumulés et non plus stockés jour par jour. Cette modification entraîne une incompatibilité avec les versions précédentes
  • La mise à jour des données depuis la branche de développement d'AWSTATS
  • Plugin top_pages_diff
  • La possibilité d'exclure une IP des statistiques
  • L'utilisation de cPickle à la place de Pickle
  • Les requêtes des robots ne sont plus sauvegardées (gain de place)
  • Quelques corrections de bug

Comme mentionnées, les modifications visent avant tout à réduire l'empreinte mémoire et disque du logiciel, ce dernier devant tourner sur un serveur avec seulement 1GB de RAM.

À vos téléchargements !

Max stack usage for C program

Wednesday, 10 April 2019
|
Écrit par
Grégory Soutadé

Another day, another script. This one helps to compute the maximum stack usage of a C program. In facts, it combines the output of cflow and GCC GNAT to find the heaviest path used (which is not necessary the deepest). The first one compute target software call graph while option -fstack-usage of GCC creates .su files containing stack usage of all functions.

Targets software are simple embedded software. This script is a simple base not intended to run on all cases, handle dynamic stack nor recursive functions (if you wish to add it...).

A file version is available here.

#!/usr/bin/env python

import os
import re
import argparse

class SUInfo:
    def __init__(self, filename, line, func_name, stack_size):
        self.filename = filename
        self.line = line
        self.func_name = func_name
        self.stack_size = stack_size

    def __str__(self):
        s = '%s() <%s:%s> %d' % (self.func_name, self.filename, self.line, self.stack_size)
        return s

class FlowElement:
    def __init__(self, root, depth, stack_size, suinfo):
        self.root = root
        self.depth = depth
        self.stack_size = stack_size
        self.suinfo = suinfo
        self.childs = []

    def append(self, suinfo):
        self.childs.append(suinfo)

    def __str__(self):
        spaces = '    ' * self.depth
        su = self.suinfo
        res = '%s-> %s() %d <%s:%d>' % (spaces, su.func_name, su.stack_size,
                                        su.filename, su.line)
        return res

def display_max_path(element):
    print('Max stack size %d' % (element.stack_size))
    print('Max path :')
    res = ''
    while element:
        res = str(element) + '\n' + res
        element = element.root
    print(res)

cflow_re = re.compile(r'([ ]*).*\(\) \<.* at (.*)\>[:]?')

def parse_cflow_file(path, su_dict):
    root = None
    cur_root = None
    current = None
    cur_depth = 0
    max_stack_size = 0
    max_path = None
    with open(path) as f:
        while True:
            line = f.readline()
            if not line: break
            match = cflow_re.match(line)
            if not match: continue

            spaces = match.group(1)
            # Convert tab into 4 spaces
            spaces = spaces.replace('\t', '    ')
            depth = len(spaces)/4
            filename = match.group(2)
            (filename, line) = filename.split(':')
            filename = '%s:%s' % (os.path.basename(filename), line)

            suinfo = su_dict.get(filename, None)
            # Some functions may have been inlined
            if not suinfo:
                # print('WARNING: Key %s not found in su dict"' % (filename))
                continue

            if not root:
                root = FlowElement(None, 0, suinfo.stack_size, suinfo)
                cur_root = root
                current = root
                max_path = root
                max_stack_size = suinfo.stack_size
            else:
                # Go back
                if depth < cur_depth:
                    while cur_root.depth > (depth-1):
                        cur_root = cur_root.root
                # Go depth
                elif depth > cur_depth:
                    cur_root = current
                cur_depth = depth
                stack_size = cur_root.stack_size + suinfo.stack_size
                element = FlowElement(cur_root, cur_depth,
                                      stack_size,
                                      suinfo)
                current = element
                if stack_size > max_stack_size:
                    max_stack_size = stack_size
                    max_path = current
                cur_root.append(element)
    display_max_path(max_path)

su_re = re.compile(r'(.*)\t([0-9]+)\t(.*)')

def parse_su_files(path, su_dict):
    for root, dirs, files in os.walk(path):
        for sufile in files:
            if sufile[-2:] != 'su': continue
            with open(os.path.join(path, sufile)) as f:
                while True:
                    line = f.readline()
                    if not line: break
                    match = su_re.match(line)
                    if not match:
                        # print('WARNING no match for "%s"' % (line))
                        continue
                    infos = match.group(1)
                    (filename, line, size, function) = infos.split(':')
                    stack_size = int(match.group(2))
                    key = '%s:%s' % (filename, line)
                    su_info = SUInfo(filename, int(line), function, stack_size)
                    su_dict[key] = su_info


if __name__ == '__main__':
    optparser = argparse.ArgumentParser(description='Max static stack size computer')
    optparser.add_argument('-f', '--cflow-file', dest='cflow_file',
                           help='cflow generated file')
    optparser.add_argument('-d', '--su-dir', dest='su_dir',
                           default='.',
                           help='Directory where GNAT .su files are generated')
    options = optparser.parse_args()

    su_dict = {}

    parse_su_files(options.su_dir, su_dict)
    parse_cflow_file(options.cflow_file, su_dict)

Usage & example

Let's take this simple software as example.

First, compile your software using -fstack-usage options in CFLAGS. It will creates an .su file for each object file. Then, launch cflow on your software. Finally, call my script.

mkdir test
cd test
gcc -fstack-usage gget.c -lpthread -lcurl
cflow gget.c > cflow.res
./cflow.py -f cflow.res

Result:

Max stack size 608
Max path :
-> main() 352 <gget.c:493>
    -> do_transfert() 160 <gget.c:228>
        -> progress_cb() 96 <gget.c:214>

Let's encrypt certificate renewal with Gandi LiveDNS API

Tuesday, 02 April 2019
|
Écrit par
Grégory Soutadé

It's now one year I use Let's Encrypt TLS wildcard certificates. Until now, all was fine, but since the beginning of 2019, there is two domains on my certificate : soutade.fr and *.soutade.fr and (maybe due to my certificate generation) I need to perform two challenges for renewal : HTTP (http01) and DNS (dns01).

So, I wrote a Python script that performs both :

#!/usr/bin/env python3
#-*- encoding: utf-8 -*-

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

#
# Handle certificate renewal using HTTP and DNS challenges
# DNS challenge performed by Gandi Live v5 API
#

import requests
import os
import argparse
import shutil

# Config
API_KEY = "YOUR-KEY"
LIVEDNS_API = "https://dns.api.gandi.net/api/v5/"
ACME_RECORD = '_acme-challenge'
ACME_CHALLENGE_PATH = '/var/www/.well-known/acme-challenge'

headers = {
    'X-Api-Key': API_KEY,
}

CERTBOT_TOKEN = os.environ.get('CERTBOT_TOKEN', None)
CERTBOT_VALIDATION = os.environ.get('CERTBOT_VALIDATION', None)
DOMAIN = os.environ.get('CERTBOT_DOMAIN', None)

optparser = argparse.ArgumentParser(description='Letsencrypt challenge for Gandi v5 API')
optparser.add_argument('-c', '--cleanup', dest='cleanup',
                       action="store_true", default=False,
                       help='Cleanup chanllenge')

options = optparser.parse_args()     

if options.cleanup:
    print('Cleanup')
    if os.path.exists(ACME_CHALLENGE_PATH):
        shutil.rmtree(ACME_CHALLENGE_PATH)
else:
    if CERTBOT_TOKEN and CERTBOT_VALIDATION:
        print('Build HTTP authentication')
        # Create token file for web server
        if not os.path.exists(ACME_CHALLENGE_PATH):
            os.makedirs(ACME_CHALLENGE_PATH)
        token_path = os.path.join(ACME_CHALLENGE_PATH, CERTBOT_TOKEN)

        with open(token_path, 'w') as token:
            token.write(CERTBOT_VALIDATION)
        exit(0)

response = requests.get(LIVEDNS_API + "zones", headers=headers)

target_zone = None
if (response.ok):
    zones = response.json()
    for zone in zones:
        if zone['name'] == DOMAIN:
            target_zone = zone
            break
else:
    response.raise_for_status()
    exit(1)

if not target_zone:
    print('Any zone found for domain %s' % (DOMAIN))
    exit(1)

domain_records_href = target_zone['zone_records_href']

# Get TXT record
response = requests.get(domain_records_href + "/" + ACME_RECORD, headers=headers)

# Delete record if it exists
if (response.ok):
    requests.delete(domain_records_href + "/" + ACME_RECORD, headers=headers)

if options.cleanup:
    exit(0)

print('Build DNS authentication')
record = {
    "rrset_name": ACME_RECORD,
    "rrset_type": "TXT",
    "rrset_ttl": 300,
    "rrset_values": [CERTBOT_VALIDATION],
    }

response = requests.post(domain_records_href,
                         headers=headers, json=record)

if (response.ok):
    print("DNS token created")
else:
    print("Something went wrong")
    response.raise_for_status()
    exit(1)

A downloadable version is available here

Crontab

In /etc/crontab :

0  1   1 * *   root   certbot renew  --manual -n --manual-public-ip-logging-ok --manual-auth-hook /root/gandi_letsencrypt.py --manual-cleanup-hook /root/letsencrypt_token_cleanup.sh

Aditionnals Scripts

Where /root/letsencrypt_token_cleanup.sh is

#!/bin/bash

/root/gandi_letsencrypt.py --cleanup

And in /etc/letsencrypt/renewal-hooks/post/ :

#!/bin/bash

service nginx restart

Errors

If you get a 404 error with nginx, you may add this line to ensure it will not delegate treatment in other part (or send it to another webserver) :

        location /.well-known/acme-challenge/ {
        }

Live Stock Monitor

Friday, 27 July 2018
|
Écrit par
Grégory Soutadé

Today, a small Python script to track live stock exchanges. It fetch data from boursorama website and format it for "Generic Monitor" XFCE applet which allows to display result of a command line script. Just setup the path of this script in genmon properties and set the delay to 60s (to avoid flooding website).

#!/usr/bin/python

#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
#

import requests
import json

params_gettickseod = {"symbol":"%s","length":"1","period":"0","guid":""}
params_updatecharts = {"symbol":"%s","period":"-1"}

base_headers = {
    'Host': 'www.boursorama.com',
    'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    'Accept-Language': 'fr,en-US;q=0.7,en;q=0.3',
    'DNT': '1',
    'Upgrade-Insecure-Requests': '1',
    'Pragma': 'no-cache',
    'Cache-Control': 'no-cache',
}
base_address = 'https://www.boursorama.com/cours/'

headers = {
    'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0',
    'Accept': 'application/json, text/javascript, */*; q=0.01',
    'Accept-Language': 'fr,en-US;q=0.7,en;q=0.3',
    'Accept-Encoding': 'gzip, deflate, br',
    'Referer': 'https://www.boursorama.com/cours/%s/',
    'Content-Type': 'application/json; charset=utf-8',
    'X-Requested-With': 'XMLHttpRequest',
    'DNT': '1',
    'Connection': 'keep-alive',
}

xhr_address = 'https://www.boursorama.com/bourse/action/graph/ws/'
address_gettickseod = xhr_address + 'GetTicksEOD'
address_updatecharts = xhr_address + 'UpdateCharts'

cookies = None

def _do_request(address, params, headers):
    if cookies is None:
        req = requests.get(address, params=params, headers=headers)
    else:
        req = requests.get(address, params=params, headers=headers, cookies=cookies)

    if req.status_code == requests.codes.ok:
        j = req.json()
        if len(j) == 0:
            raise Exception('Not available')
        return j
    else:
        raise Exception("Request error!")

def getStock(stock, display_name=None):
    my_headers = headers.copy()
    my_headers['Referer'] = headers['Referer'] % (stock)

    close_value = 0
    res = ''

    my_params  = params_updatecharts.copy()
    my_params["symbol"] = stock
    try:
        j = _do_request(address_updatecharts, my_params, my_headers)
    except:
        req = requests.get(base_address + stock, headers=base_headers)
        # cookies = req.cookies
        j = _do_request(address_updatecharts, my_params, my_headers)

    current = float(j['d'][0]['c'])
    my_params  = params_gettickseod.copy()
    my_params["symbol"] = stock
    try:
        j = _do_request(address_gettickseod, my_params, my_headers)
        close_value = float(j['d']['qv']['c'])
    except Exception, e:
        if not len(j):
            raise e
        close_value = float(j['d'][0]['o']) # Open value

    if close_value != 0:
        var = ((current/close_value) - 1)*100
    else:
        var = 0
    if current < close_value:
        color = 'red'
        var = -var
    else:
        color = 'green'
    if not display_name is None:
        res += '%s ' % (display_name)
    res += '%.3f <span fgcolor="%s">%.2f</span>' % (current, color, var)

    return res

def getMail():
    res = ''
    nb_messages = ''
    pipew = open("/tmp/gmail-pipe-w", "wb+")
    piper = open("/tmp/gmail-pipe-r", "rb+")
    pipew.write("a\n")
    pipew.flush()
    while not len(nb_messages):
        nb_messages = piper.readline()
    if len(nb_messages):
        nb_messages = int(nb_messages)
        if nb_messages == 1:
            res = ', 1 msg'
        elif nb_messages > 1:
            res = ', %d msgs' % (nb_messages)
    pipew.close()
    piper.close()

    return res

def getStocks(stocks):
    res = ''
    for stock in stocks:
        if res != '': res += ', '
        try:
            res += getStock(*stock)
        except Exception, e:
            if len(stock) > 1:
                res += "%s %s" % (stock[1], str(e))
            else:
                res += str(e)
    res += getMail()
    print('<txt>%s</txt>' % (res))

getStocks([('1rPENX', 'Euronext'), ('1rPAIR',)])

Get stock code id from website URL (last part). A file version is available here.

I added another part to get email count from gmail. It relies on a bash script that fetches RSS feeds when data is wrote in the FIFO.

Body of the script :

#!/bin/bash

USER='soutade'

while [ 1 ] ; do
    echo -n "Please enter gmail account password : "
    read -s password
    echo ""
    echo -n "Confirm password : "
    read -s password2
    echo ""
    if [ "$password" != "$password2" ] ; then
        echo -e "Passwords doesn't match !!\n"
        continue
    fi
    break
done

pipew="/tmp/gmail-pipe-w"
piper="/tmp/gmail-pipe-r"

rm -f $pipew $piper
mkfifo $pipew $piper

while [ 1 ] ; do
    read line < $pipew
    feeds=`curl -u "$USER:$password" --silent "https://mail.google.com/mail/feed/atom"`
    echo $feeds | sed  s/.*\<fullcount\>//g | sed  s/\<\\/fullcount\>.*//g > $piper
done

You can hardcode password in the script, but I don't like having my password in clear on the harddrive. A file version is available here.

Gandi Live DNS v5

Thursday, 26 April 2018
|
Écrit par
Grégory Soutadé

La toute récente sortie de Pannous a été l'occasion de créer un nouveau sous-domaine pour héberger le service. Service qui comporte une partie d'authentification, donc obligation de passer par une communication sécurisée (SSL/TLS). Autrefois, la chose était plus aisée, puisque je pouvais générer mes propres certificats et notamment des certificats "wildcards", donc valides pour tous les sous-domaines.

Sauf que je suis passé à Let's Encrypt. Il a donc fallu attendre la sortie de la version 2 du protocole (qui a eu du retard) afin de bénéficier de cette fonctionnalité. Surtout, qu'au passage, le paquet Debian (backport) de certbot a été cassé, ce qui m'a forcé à revenir à une version encore plus ancienne.

Bref, les choses sont maintenants stables et déployées sur les serveurs respectifs. Petit problème néanmoins, la génération d'un certificat wildcard par Let's Encrypt requiert l'ajout d'une entrée DNS (comme challenge). Fatalité, le DNS de Gandi a lui aussi évolué pour passer en version 5. Avec pour principal avantage une mise à jour immédiate des entrées DNS (là où il fallait plusieurs minutes/heures auparavant). Autre nouveauté : l'API Gandi change de format. On oublie l'ancien XML-RPC (ce qui était pratique avec des bindings Python déjà tout faits), pour passer au REST (un peu moins formel).

Mélangeons tout ça pour obtenir un joli cocktail, dont la recette nous est donnée par Sébastien Blaisot qui, pour nous simplifier la vie, a créé des scripts de génération de certificats wildcards via Let's Encrypt. Le code est disponible sur GitHub et supporte le logiciel bind (en local), l'API OVH et la nouvelle API Gandi. Il ne reste plus qu'à cloner le dépôt et lancer la commande magique :

cd certbot-dns-01-authenticators/gandi-livedns
certbot certonly --manual --server https://acme-v02.api.letsencrypt.org/directory\
 --manual-auth-hook $PWD/auth.py --manual-cleanup-hook $PWD/cleanup.py -d '*.soutade.fr'

Et voilà un joli certificat tout frais !

Du coup, je me suis grandement inspiré de son code pour mettre à jour mon script de DNS fallback (serveur de secours via redirection DNS). Avec, en prime, un passage en Python 3 ! À terme, il faudra que j'ajoute le support IPv6.

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import requests
import json
import re

# Config
domain="soutade.fr"
API_KEY = "YOUR-KEY"
livedns_api = "https://dns.api.gandi.net/api/v5/"
dyndns_url = 'http://checkip.dyndns.com/'
headers = {
    'X-Api-Key': API_KEY,
}
A_RECORD_NAME="@" # Record of A type

# Get current IP
current_ip = ''
response = requests.get(dyndns_url)
if response.ok:
    pattern = re.compile('[^:]*(\d+\.\d+\.\d+\.\d+)')
    result = pattern.search(response.text, 0)
    if result == None:
        print("No IP found")
        exit(1) 
    else:
        current_ip = result.group(0).strip()
else:
    print("Connexion error")
    response.raise_for_status()
    exit(1)

print('Your Current IP is %s' % (current_ip))

# Retrieve domains address
response = requests.get(livedns_api + "domains", headers=headers)

if (response.ok):
    domains = response.json()
else:
    response.raise_for_status()
    exit(1)

domain_index = next((index for (index, d) in enumerate(domains) if d["fqdn"] == domain), None)

if domain_index == None:
    # domain not found
    print("The requested domain " + domain + " was not found in this gandi account")
    exit(1)

domain_records_href = domains[domain_index]["domain_records_href"]

# Get recorded IP
response = requests.get(domain_records_href + "/" + A_RECORD_NAME + "/A", headers=headers)

if (response.ok):
    record = response.json()
else:
    print("Failed to look for recorded IP")
    response.raise_for_status()
    exit(1)

print('Old IP : %s' % (record['rrset_values'][0]))

if current_ip != record['rrset_values'][0]:
    record['rrset_values'][0] = current_ip

    # Put updated IP
    response = requests.put(domain_records_href + "/" + A_RECORD_NAME + "/A", headers=headers, json=record)

    if (response.ok):
        print("IP updated")
    else:
        print("something went wrong")
        response.raise_for_status()
        exit(1)

    exit(34) # IP updated return !!

exit(0)

Une version téléchargable est disponible ici.