RSS | technovelty home | page of ian | ian@wienand.org
So, you have some application where you want the user to specify a remote host/port, and you want to support IPv4 and IPv6.
For literal addresses, things are fairly simple. IPv4 addresses are simple, and RFC2732 has things covered by putting the IPv6 address within square brackets.
It gets more interesting as to what you should do with hostnames. The problem is that getaddrinfo can return you multiple addresses, but without extra disambiguation from the user it is very difficult to know which one to choose. RFC4472 discusses this, but there does not appear to be any good solution.
Possibly you can do something like ping/ping6 and have a separate program name or configuration flag to choose IPv6. This comes at a cost of transparency.
The glibc implementation of getaddrinfo() puts considerable effort into deciding if you have an IPv6 interface up and running before it will return an IPv6 address. It will even recognise link-local addresses and sort addresses more likely to work to the front of the returned list as described here. However, there is still a small possibility that the IPv6 interface doesn't actually work, and so the library will sort the IPv6 address as first in the returned list when maybe it shouldn't be.
If you are using TCP, you can connect to each address in turn to find one that works. With UDP, however, the connect essentially does nothing.
So I believe probably the best way to handle hostnames for UDP connections, at least on Linux/glibc, is to trust getaddrinfo to return the sanest values first, try a connect on the socket anyway just for extra security and then essentially hope it works. Below is some example code to do that (literal address splitter bit stolen from Python's httplib).
import socket
DEFAULT_PORT = 123
host = '[fe80::21c:a0ff:fb27:7196]:567'
# the port will be anything after the last :
p = host.rfind(":")
# ipv6 literals should have a closing brace
b = host.rfind("]")
# if the last : is outside the [addr] part (or if we don't have []'s
if (p > b):
try:
port = int(host[p+1:])
except ValueError:
print "Non-numeric port"
raise
host = host[:p]
else:
port = DEFAULT_PORT
# now strip off ipv6 []'s if there are any
if host and host[0] == '[' and host[-1] == ']':
host = host[1:-1]
print "host = <%s>, port = <%d>" % (host, port)
the_socket = None
res = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_DGRAM)
# go through all the returned values, and choose the ipv6 one if
# we see it.
for r in res:
af,socktype,proto,cname,sa = r
try:
the_socket = socket.socket(af, socktype, proto)
the_socket.connect(sa)
except socket.error, e:
# connect failed! try the next one
continue
break
if the_socket == None:
raise socket.error, "Could not get address!"
# ready to send!
the_socket.send("hi!")
posted at: Tue, 16 Mar 2010 15:54 | in /code/python | permalink | add comment (0 others)
I'm not sure how to describe this best, but numbers2ppm.py is a Python script to turn a list of numbers into a (plain format) PPM image filled with coloured boxes. Perhaps an example is best.
$ cat test.in 01234567899876543210 $ ./numbers2ppm.py -W 40 -H 40 -c 10 ./test.in > test.ppm $ convert test.ppm test.png
You should end up with
If you're me, you could use something like this to read a dump of reference counts of physical frames of memory dumped from the kernel, creating a nice graphical view of memory usage and sharing. I imagine it may come in handy for other things too.
posted at: Fri, 10 Feb 2006 21:32 | in /code/python | permalink | add comment (1 others)
I'm really not sure if there is an eaiser way to do this, but here
is my newly most-used utility. It puts two files beside each other;
but as opposed to sdiff/diff -y doesn't analyse it, and
as opposed to paste keeps to fixed widths. Here is a
screen shot.
$ python ./side-by-side.py --width 40 /tmp/afile.txt /tmp/afile2.txt
this is a file | i have here another file
that has lines of | that also has some text and
text. to read | some lines. although it
| is slightly longer than the other
this is a really really really really re * file with all these words
| in
| it
I'd love to hear from all the Python freaks how you could get the LOC even lower; every time I do something like this I find out about a new, quicker, way to recurse a list :)
#!/usr/bin/python
import sys, os
from optparse import OptionParser
class InFile:
def __init__(self, filename):
try:
self.lines=[]
self.maxlen = 0
for l in open(filename).readlines():
self.lines.append(l.rstrip())
except IOError, (error, message):
print "Can't read input %s : %s" % (filename, message)
sys.exit(1)
self.nlines = len(self.lines)
if self.nlines == 0:
self.lines.append("")
self.maxlen = max(map(len, self.lines))
# pad to the max len, with a extra space then the deliminator
def pad_lines(self, nlines, width=0, nodiv=False, notrunc=False):
if width == 0:
width = self.maxlen
pad = []
for i in range(0, width):
pad += " "
# add on some extra for the divider and spaces
pad += " "
padlen = len(pad)
for i in range(0, nlines):
try:
linelen = len(self.lines[i])
except IndexError:
self.lines.append("")
linelen = 0
if (linelen > width):
linelen = width
if not notrunc:
pad[-2] = "*"
elif nodiv:
pad[-2] = " "
else:
pad[-2] = "|"
self.lines[i] = self.lines[i][:linelen] + "".join(pad[linelen - padlen:])
usage= "side-by-side [-w width] file1 file2 ... filen"
parser = OptionParser(usage, version=".1")
parser.add_option("-w", "--width", dest="width", action="store", type="int",
help="Set fixed width for each file", default=0)
parser.add_option("--last-div", dest="lastdiv", action="store_true",
help="Print divider after last file", default=False)
parser.add_option("--no-div", dest="nodiv", action="store_true",
help="Don't print any divider characters", default=False)
parser.add_option("--no-trunc", dest="notrunc", action="store_true",
help="Don't show truncation with a '*'", default=False)
(options, args) = parser.parse_args()
flist = []
if (len(args) == 0):
print usage
sys.exit(1)
for f in args:
flist.append(InFile(f))
max_lines = max(map(lambda f: f.nlines, flist))
for i in range(0,len(flist)):
if (len(flist)-1 == i):
options.nodiv = not options.lastdiv
flist[i].pad_lines(max_lines, options.width, options.nodiv, options.notrunc)
for l in range(0, max_lines):
for f in flist:
print f.lines[l],
print
update: Leon suggests
pr -Tm file1 file2
Which is pretty close, but doesn't seem to put any divider between
the files. Still might be a handy tool for your toolbox. It seems,
from the pr man page, the word for doing this sort of
thing is columnate.
update 2: Told you I'd learn new ways to iterate! Stephen Thorne came up with a neat solution. Some extra tricks he used
' ' * n gives you n spaces. Pretty
obvious when you think of it!yield statement.itertools package with it's modified
zip like izip function.All very handy tips for your Python toolbox. If you're learning Python I'd reccommend solving this problem as you can really put to use some of the niceties of the language.
posted at: Wed, 12 Oct 2005 15:04 | in /code/python | permalink | add comment (0 others)
Here is a python script to convert IP addresses into hexadecimal, which may be required to name files for your bootloader if you are trying to netboot, for example. You can specify a mask if you have a large group of machines on a network (e.g. 10.1.3.2 with a mask of 24 will just give you 0x0A == 10d, while a mask of 16 gives you 0xOA01).
import re
import sys
import socket
if (not len(sys.argv) == 2):
print "Usage: ip2hex.py hostname|ip address/mask"
sys.exit(1)
try:
(in_str, mask) = sys.argv[1].split("/")
# sanity check mask
mask = int(mask)
if (mask > 32 or mask < 0):
print "Mask out of range"
sys.exit(1)
except ValueError:
mask = 0
in_str = sys.argv[1]
try:
ip_addr = socket.gethostbyname(in_str)
except:
print "Invalid address!"
sys.exit(1)
#gethostbyname really checks this for us, but you never know
ip_regex = re.compile('(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' \
'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' \
'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' \
'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)')
ip_match = ip_regex.match(ip_addr)
if (ip_match == None):
print "Invalid address"
sys.exit(1)
hex_ip_addr = 0
for i in range(1,5):
hex_ip_addr += int(ip_match.group(i)) << (4-i)*8
fmt = "%%0%dX" % ((32 - mask) / 4)
print fmt % (hex_ip_addr >> mask)
posted at: Mon, 25 Jul 2005 16:51 | in /code/python | permalink | add comment (0 others)
Asterisks have lots of meaning in Python. Firstly, consider in a function definition
>>> def function(arg, *vargs, **kargs):
print arg
print vargs
print kargs
>>> function(1, 2,3,4,5, test1="abc", test2="def")
1
(2, 3, 4, 5)
{'test1': 'abc', 'test2': 'def'}
*vargs puts all left-over non-keyword arguments into a tuple called vargs.**kargs puts all left-over keyword arguments into a dictionary called kargs.On the other hand, you can use the asterisk with a tuple when calling a function to expand out the elements of the tuple into positional arguments
>>> def function(arg1, arg2, arg3):
print arg1, arg2, arg3
>>> args = (1,2,3)
>>> function(*args)
1 2 3
You can do a similar thing with keyword arguments and a dictionary with the double asterisk operator
>>> def function(arg1=None, arg2=None):
print arg1, arg2
>>> dict = {"arg1":"1", "arg2":"2"}
>>> function(**dict)
1 2
posted at: Tue, 05 Apr 2005 14:04 | in /code/python | permalink | add comment (0 others)
In a previous
post I mentioned importing a python file as a config file via
ihooks.
Craig Ringer sent me a note, pasted below
I think there's a considerably simpler approach than the one you've taken, as well as an alternative to ihooks that's quite a bit tidier to use.
First, the simple alternative. The `import' statement pretty much just calls __builtin__.__import__(), and you're quite free to use that in your code directly. This lets you do something like this for your config importer (based off your posted code):
import os import __builtin__ try: config = __builtin__.__import__('config', globals(), {}, []) except Exception: # We catch any Exception because the user could do all sorts # of things wrong in the import. We can't even guarantee that # an ImportError comes from our __import__ call and not an import # attempt inside the config file without delving into the backtrace # using the traceback module. print "An error occurred while importing the config file." print "Traceback follows." raise def config_example(): print config.some_config_dictionary["option"] if __name__ == "__main__": config_example()It's worth noting that if I recall correctly the __builtin__ should be used; __builtins__ is specific to the CPython implementation.
If you start having to accept an explicit path for the config file from the user or something then ihooks is probably better. For simple cases, __builtin__.__import__() is much quicker and easier to understand though.
If you do decide you need more than __builtin__.__import__() has to offer, it may be better to look into the PEP 302 import hooks, which are much cleaner and simpler than the rather ancient ihooks module. I should probably submit a patch to the ihook docs to make them refer to PEP 302, actually.
Some info on PEP 302: http://www.python.org/doc/2.3.5/whatsnew/section-pep302.html
http://www.python.org/doc/2.3.5/whatsnew/section-pep302.html
Indeed, __import__ was my first attempt at doing what
I wanted; but it is a little off the semantics I wanted. I wanted to
be able to specify a file on the command line with an argument
--config=/path/to/file/config.py and have that loaded in.
For example, __import__("/path/to/config.py") doesn't
work.
Import hooks I looked into and are a possibility, but these also don't quite give the semantics I wanted. Here's a minimal example of how it might work
import sys
import imp
import os
path_to_config_file = "./testdir/config.py"
class Config:
def __init__(self, path):
print path
self.path = path
def _get_code(self, fullname):
# path_to_config_file might have come in via our options, etc
f = open(path_to_config_file)
# read in the file and compile it
c = ""
for line in f.readlines():
c = c + line
code = compile(c, fullname, 'exec')
# false means this is not a package, code is the compiled code
return False, code
def find_module(self, fullname):
# sufficient to return ourselves
print fullname
return self
def load_module(self, fullname):
# get the code
ispkg, code = self._get_code(fullname)
# create a new module
mod = imp.new_module(fullname)
# add it to the modules list
sys.modules[fullname] = mod
# set the file
mod.__file__ = "<%s>" % (path_to_config_file)
# set the module loader
mod.__loader__ = self
# exec code
exec code in mod.__dict__
# return the module object
return mod
sys.path_hooks.append(Config)
import aconfigfile
print aconfigfile
aconfigfile.hello()
The issue I see with this is that the import
aconfigfile isn't really clearly showing what I want. I really
want to be able to pass any arbitrary file name to be imported,
e.g. something like import ./a/config/config.file. As
far as I can see, import hooks are more designed to import files held
in, say, a zip or pak archive.
posted at: Mon, 07 Mar 2005 17:16 | in /code/python | permalink | add comment (0 others)
In some circumstances you might like to have the configuration file
for your Python program actually be another Python program, especially
if you want your users to be able to write really cool config files.
At first glance the eval() statement looks like a nice
way to do this, but unfortunatley it won't work because
import isn't an expression, it's a statement.
>>> config_file = "config"
>>> eval("import " + config)
Traceback (most recent call last):
File "<stdin>", line 1, in ?
NameError: name 'config' is not defined
You can get around this however with the ihooks
library. First, create a config file called config.py
with the following.
class Config:
some_config_dictionary = {"option" : "Hello Config"}
Then you can import the config as per the following
import ihooks, os
def import_module(filename):
loader = ihooks.BasicModuleLoader()
path, file = os.path.split(filename)
name, ext = os.path.splitext(file)
module = loader.find_module_in_dir(name, path)
if not module:
raise ImportError, name
module = loader.load_module(name, module)
return module
def config_example():
config = import_module("config.py").Config
print config.some_config_dictionary["option"]
if __name__ == "__main__":
config_example()
All going well, you should print out Hello,
Config!
posted at: Wed, 02 Mar 2005 20:18 | in /code/python | permalink | add comment (0 others)
Generally, when passing the size of something around it's good to
use size_t and, should that something be a blob (in the
binary object type sense) it probably wants to be a
However, the cstring extension to SWIG uses
int as sizes and char* for data, for example
%cstring_output_withsize(parm, maxparm): This macro is used to handle bounded character output functions where both a char * and a pointer int * are passed. Initially, the int * parameter points to a value containing the maximum size. On return, this value is assumed to contain the actual number of bytes. As input, a user simply supplies the maximum length. The output value is a string that may contain binary data.You could potentially create your own typemaps to handle this and re-write large parts of
cstring SWIG interface, but the
point would be moot because by the time it got back to the Python API
it has to be an int anyway; e.g. calls like
PyObject* PyString_FromStringAndSize(const char *v, int
len) all take an int. Since Python supports
binary strings everything should be a char* too (this is
less critical, but if you want to build with -Wall
-Werror as you should, you'll need to make sure the types are
right).
I would reccommend not following some of the SWIG
instructions about doing your own typedef for size_t.
This seems fraught with danger and you're only going to be calling
Python API functions that expect an int anyway. Be aware
that if you really have a need to be passing something around with a
size that doesn't fit in an int, you'll have some work to
do; otherwise design your API with the right types for the Python
API.
posted at: Thu, 24 Feb 2005 12:03 | in /code/python | permalink | add comment (0 others)
The socketserver class is very nifty, but the inbuilt
documentation is a bit obscure. Below is a simple SocketServer
application that simply listens on port 7000; run it and telnet
localhost 7000 and you should be greeted by a message; type
HELLO and you should get a message back, and
QUIT should end the whole thing.
import SocketServer, time, select, sys
from threading import Thread
COMMAND_HELLO = 1
COMMAND_QUIT = 2
# The SimpleRequestHandler class uses this to parse command lines.
class SimpleCommandProcessor:
def __init__(self):
pass
def process(self, line, request):
"""Process a command"""
args = line.split(' ')
command = args[0].lower()
args = args[1:]
if command == 'hello':
request.send('HELLO TO YOU TO!\n\r')
return COMMAND_HELLO
elif command == 'quit':
request.send('OK, SEE YOU LATER\n\r')
return COMMAND_QUIT
else:
request.send('Unknown command: "%s"\n\r' % command)
# SimpleServer extends the TCPServer, using the threading mix in
# to create a new thread for every request.
class SimpleServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
# This means the main server will not do the equivalent of a
# pthread_join() on the new threads. With this set, Ctrl-C will
# kill the server reliably.
daemon_threads = True
# By setting this we allow the server to re-bind to the address by
# setting SO_REUSEADDR, meaning you don't have to wait for
# timeouts when you kill the server and the sockets don't get
# closed down correctly.
allow_reuse_address = True
def __init__(self, server_address, RequestHandlerClass, processor, message=''):
SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass)
self.processor = processor
self.message = message
# The RequestHandler handles an incoming request. We have extended in
# the SimpleServer class to have a 'processor' argument which we can
# access via the passed in server argument, but we could have stuffed
# all the processing in here too.
class SimpleRequestHandler(SocketServer.BaseRequestHandler):
def __init__(self, request, client_address, server):
SocketServer.BaseRequestHandler.__init__(self, request, client_address, server)
def handle(self):
self.request.send(self.server.message)
ready_to_read, ready_to_write, in_error = select.select([self.request], [], [], None)
text = ''
done = False
while not done:
if len(ready_to_read) == 1 and ready_to_read[0] == self.request:
data = self.request.recv(1024)
if not data:
break
elif len(data) > 0:
text += str(data)
while text.find("\n") != -1:
line, text = text.split("\n", 1)
line = line.rstrip()
command = self.server.processor.process(line,
self.request)
if command == COMMAND_HELLO:
break
elif command == COMMAND_QUIT:
done = True
break
self.request.close()
def finish(self):
"""Nothing"""
def runSimpleServer():
# Start up a server on localhost, port 7000; each time a new
# request comes in it will be handled by a SimpleRequestHandler
# class; we pass in a SimpleCommandProcessor class that will be
# able to be accessed in request handlers via server.processor;
# and a hello message.
server = SimpleServer(('', 7000), SimpleRequestHandler,
SimpleCommandProcessor(), 'Welcome to the SimpleServer.\n\r')
try:
server.serve_forever()
except KeyboardInterrupt:
sys.exit(0)
if __name__ == '__main__':
runSimpleServer()
posted at: Sat, 19 Feb 2005 13:08 | in /code/python | permalink | add comment (2 others)
If you find yourself in the following situation : you have to use separate processes because Python doesn't really support sending signals with multiple threads but you need some sort of mutual exclusion between these processes. Your additional Iron Python ingredient is that you don't really want to have dependencies on any external modules not shipped with Python that implement SysV IPC or shared objects (though this is probably the most correct solution).
If you're a Python programmer and not a C programmer, you may not realise that mmap can map memory anonymously as it's not mentioned in the help. In this case, anonymously means that it is not really backed by a file, but by system memory. This doesn't seem to be documented in the Python documenation, but will work with most any reasonable Unix system.
Here is a Half Baked Mutex based on anonymous shared memory. This version really is half baked, because it uses the "Bakery Algorithm" designed by Lamport (which Peter Chubb taught me about in distributed systems). It differs slightly though -- our maximum ticket number must be less than 256 because of interactions between the python mmap, which treats the mmaped area as a string and the ascii character set (via ord and chr). This means heavily contested locks will be a bit "jerky" as the system waits to free up a few slots before continuing. We have a smattering of calls to sleep to attempt to reduce busy waiting. The only other caveat is "hashing" the PID down to a 1024 byte array -- a collision would probably be fatal to the algorithm.
It's not perfect, but it might something like it might get you out of a bind.
import os
import sys
from time import sleep
class HalfBakedMutex:
def __init__(self):
import mmap
#C is an array of people waiting for a ticket
self.C = mmap.mmap(-1, 1024,
mmap.MAP_SHARED|mmap.MAP_ANONYMOUS)
#N is list of tickets people are holding
self.N = mmap.mmap(-1, 1024,
mmap.MAP_SHARED|mmap.MAP_ANONYMOUS)
def take_a_number(self):
#pick a number to "see the baker"
i = os.getpid() % 1024
#find current maximum number
while True:
#indicate we are currently getting a number
self.C[i] = chr(1)
max = 0
for j in range(1024):
if (ord(self.N[j])) > max:
max = ord(self.N[j])
#we can't have max > 256 as chr(256) will fail
if (max + 1 < 256):
break
else:
self.C[i] = chr(0)
sleep(0.1)
#take next maximum
self.N[i] = chr(max + 1)
self.C[i] = chr(0)
def lock(self):
#first, take a number to see the baker
self.take_a_number()
i = os.getpid() % 1024
for j in range(1024):
#make sure the process isn't currently getting a ticket
while ord(self.C[j]) == 1:
sleep(0.1)
continue
# If process j has a ticket, i.e.
# N[j] > 0
# AND either the process has a lower ticket, or the same
# ticket and a lower PID, i.e.
# (N[j],j) < (N[i],i)
# wait for it to run
while (ord(self.N[j]) > 0) and (ord(self.N[j]),j) < (ord(self.N[i]),i) :
sleep(0.1)
continue
#if we made it here, it is our turn to run!
return
def unlock(self):
i = os.getpid() % 1024
self.N[i] = chr(0)
mut = HalfBakedMutex()
os.fork()
os.fork() # 4 processes
os.fork() # 8 processes
os.fork() # 16 processes
os.fork() # 32 processes
os.fork() # 64 processes
while True:
mut.lock()
print(" ------ " + `os.getpid()` + " ------ ")
mut.unlock()
posted at: Wed, 16 Feb 2005 10:33 | in /code/python | permalink | add comment (0 others)

This work is licensed under a Creative Commons Attribution-ShareAlike 2.5 License.