Source code for feedr.feedr

# TODO: (FEAT) support for different statistical distributions
# TODO: (FEAT) support throughput testing thru http://linux.die.net/man/1/pv
# TODO: (IMPRV) remove six and dictconfig from coverage report


import logging
import logger
import os
import sys
from time import sleep
import datetime
import random

# import transports and formatters
import transports as trans
import formatters as forms

# import fakers
from faker import Factory
import format_mappings as fm


DEFAULT_BASE_LOGGING_LEVEL = logging.INFO
DEFAULT_VERBOSE_LOGGING_LEVEL = logging.DEBUG

DEFAULT_CONFIG_FILE = 'config.py'
DEFAULT_GAP = 0.01

DEFAULT_TRANSPORT = 'File'
DEFAULT_FORMATTER = 'Custom'

lgr = logger.init()


def _set_global_verbosity_level(is_verbose_output=False):
    """sets the global verbosity level for console and the lgr logger.

    :param bool is_verbose_output: should be output be verbose
    """
    global verbose_output
    # TODO: (IMPRV) only raise exceptions in verbose mode
    verbose_output = is_verbose_output
    if verbose_output:
        lgr.setLevel(logging.DEBUG)
    else:
        lgr.setLevel(logging.INFO)
    # print 'level is: ' + str(lgr.getEffectiveLevel())


def _import_config(config_file):
    """returns a Feedr configuration object

    :param string config_file: path to config file
    """
    # get config file path
    config_file = config_file or os.path.join(os.getcwd(), DEFAULT_CONFIG_FILE)
    lgr.debug('config file is: {0}'.format(config_file))
    # append to path for importing
    sys.path.append(os.path.dirname(config_file))
    try:
        lgr.debug('importing generator dict...')
        return __import__(os.path.basename(os.path.splitext(
            config_file)[0])).GENERATOR
    # TODO: (IMPRV) remove from path after importing
    except ImportError:
        lgr.warning('config file not found: {0}.'.format(config_file))
        raise FeedrError('missing config file')
    except SyntaxError:
        lgr.error('config file syntax is malformatted. please fix '
                  'any syntax errors you might have and try again.')
        raise FeedrError('bad config file')


[docs]def get_current_time(): """returns the current time (no microseconds tho)""" return datetime.datetime.now().replace(microsecond=0)
[docs]def calculate_throughput(elapsed_time, messages): """calculates throughput and extracts the number of seconds for the run from the elapsed time :param elapsed_time: run time :param int messages: number of messages to write :return: throughput and seconds :rtype: tuple """ ftr = [3600, 60, 1] seconds = sum([a * b for a, b in zip( ftr, [int(i) for i in str(elapsed_time).split(":")])]) try: return messages / seconds, seconds except ZeroDivisionError: lgr.warning('caluclating throughput for less-than-a-second ' 'runs is currently not supported. come back soon..') return 'Unknown', seconds
[docs]def send(instance, client, formatter, format_config, messages, gap, batch): """sends data and prints the time it took to send it :param instance: transport class instance :param client: client to use to send send :param string format: formatter to use :param dict format_config: formatter configuration to use :param int messages: number of messages to send :param float gap: gap in seconds between 2 messages :param int batch: number of messages per batch """ message_count = 0 lgr.debug('configuring formatter...') # get formatter instance # TODO: (IMPRV) move formatter instance definition to function inside # TODO: (IMPRV) the current function and add _ if hasattr(forms, formatter): formatter_instance = getattr(forms, formatter)(format_config) else: lgr.error('could not find formatter: {0}. please make sure the ' 'formatter you\'re calling exists.'.format(formatter)) raise FeedrError('missing formatter') # and get the current time start_time = get_current_time() lgr.debug('start time is: {0}'.format(start_time)) lgr.info('transporting data... EN GARDE!') while True: # generate the data from the formatter data = [formatter_instance.generate_data() for i in xrange(batch)] # and send the data through the relevant transport instance.send(client, data) message_count += batch # check if the number of messages sent are less than the desired amount if message_count < messages or messages == 0: # and sleep the desired amount of time.. zzz zz zZZ zZZzzzz sleep(gap) else: break # just to get some feedback during execution if not message_count % (1 / gap): lgr.info('{0} data pieces written. NICE!'.format(message_count)) # then get the current time once more end_time = get_current_time() lgr.debug('end time is: {0}'.format(end_time)) # and the elapsed time elapsed_time = end_time - start_time # meH! throughput, seconds = calculate_throughput(elapsed_time, messages) # TODO: (FEAT) add the option to send the throughput as well to benchmark # TODO: (FEAT) the transport process itself. lgr.info('DONE! (after {0}h ({1} seconds) with ' 'throughput: {2} pieces/sec. now you can go for coffee.)'.format( elapsed_time, seconds, throughput)) try: # create a pretty table to write the statistical data to # TODO: (IMPRV) move this to generator function. data = instance.get_data() lgr.info('statistical data:\n {0}'.format(data)) except AttributeError: lgr.debug( 'statistical data not implemented for chosen transport.') # TODO: (IMPRV) why is this here? return
[docs]def config_transport(transports, transport, transport_config): """returns a configured instance and client for the transport :param transports: transport classes to choose from. :param string transport: transport to use :param dict transport_config: transport configuration """ lgr.debug('configuring transport...') # get transport instance if hasattr(transports, transport): transport_instance = getattr(transports, transport)(transport_config) else: lgr.error('could not find transport: {0}. please make sure the ' 'transport you\'re calling exists.'.format(transport)) raise FeedrError('missing transport: {0}'.format(transport)) # get transport client client = transport_instance.configure() return transport_instance, client
[docs]def generator(config=None, transport=None, formatter=None, gap=None, messages=0, batch=1, verbose=False): """generates data this will generate data in the requested format and protocol. :param string config: path to config file path :param string transport: transport type to use :param string formatter: formatter to use :param float gap: gap in seconds between 2 messages :param int messages: number of messages to send :param int batch: number of messages to stack before sending :param bool verbose: sets verbose state for internal logging. """ def return_real(gap, messages, batch): return float(gap), abs(int(messages)), int(batch) # set verbosity level for internal logging _set_global_verbosity_level(verbose) # set params for basic Feedr configuration # import config file transport = transport if transport else DEFAULT_TRANSPORT formatter = formatter if formatter else DEFAULT_FORMATTER config = _import_config(config) if config else {} gap, msgs, batch = return_real( gap if gap else DEFAULT_GAP, messages if messages else 0, batch if batch else 1 ) # TODO: (IMPRV) move config to different function. # declare transport and formatter configuration. will assume defaults # if config file wasn't imported. all_transports = config.get('transports', {}) \ if config else {} all_formatters = config.get('formatters', {}) \ if config else {} transport_config = all_transports.get(transport, {}) \ if all_transports.get(transport) else {} formatter_config = all_formatters.get(formatter, {}) \ if all_formatters.get(formatter) else {} transport = transport_config.get('type', transport) \ if transport_config else transport formatter = formatter_config.get('type', formatter) \ if formatter_config else formatter lgr.debug('transport: {0}'.format(transport)) lgr.debug('formatter: {0}'.format(formatter)) lgr.debug('gap: {0}'.format(gap)) lgr.debug('message count: {0}'.format(msgs if msgs > 0 else 'infinite')) # well.. you can't have that right? that would be stupid. if msgs > 0 and batch > msgs: raise FeedrError('batch number larger than total amount of messages') else: lgr.debug('batch: {0}'.format(batch)) # define transport class instance and shipping client instance, client = config_transport( trans, transport + 'Transport', transport_config) # send the stuff send(instance, client, formatter + 'Formatter', formatter_config, msgs, gap, batch) # maybe close a connection to the host is required... try: instance.close() except AttributeError: lgr.debug('connection closing not implemented for chosen transport.')
[docs]def list_fake_types(): """prints a list of random data types with an example""" fake = Factory.create() ignore_list = [ 'add_provider', 'format', 'get_formatter', 'set_formatter', 'parse', 'provider', 'providers', 'get_providers', ] fakes_list = [] # yes, the following is sort of disgusting. will have to find a better way # to implement this... # list from fake-factory for fake_type in dir(fake): if not fake_type.startswith('_') and fake_type not in ignore_list: fakes_list.append('*** {0} ({1})'.format( fake_type, getattr(fake, fake_type)())) # list from format mappings default handler for fake_type, data in fm.DATA.items(): fakes_list.append('*** {0} ({1})'.format( fake_type, random.choice(data))) # list from format mappings additional handlers for fake_type in dir(fm.InHouseFaker): if not fake_type.startswith('_') and not fake_type == 'default': fakes_list.append('*** {0} ({1})'.format( fake_type, getattr(fm.InHouseFaker(), fake_type)())) print("\n".join(fakes_list))
[docs]def list_transports(): for transport in dir(trans): if 'Transport' in transport: print('*** {0}'.format(transport.replace('Transport', '')))
[docs]def list_formatters(): for formatter in dir(forms): if 'Formatter' in formatter: print('*** {0}'.format(formatter.replace('Formatter', '')))
[docs]class FeedrError(Exception): pass