Logo Search packages:      
Sourcecode: fair version File versions  Download package

worker.c

/******************************************************************************

      worker.c -- bookkeeping for known transponder nodes
      Copyright (C) 2004  Wessel Dankers <wsl@uvt.nl>

      This program is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published by
      the Free Software Foundation; either version 2 of the License, or
      (at your option) any later version.

      This program is distributed in the hope that it will be useful,
      but WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      GNU General Public License for more details.

      You should have received a copy of the GNU General Public License
      along with this program; if not, write to the Free Software
      Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

      $Id: worker.c 23144 2007-12-21 15:36:48Z wsl $
      $URL: https://infix.uvt.nl/its-id/trunk/sources/fair-0.5.1/src/worker.c $

******************************************************************************/

#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <regex.h>
#include <sys/types.h>
#include <sys/socket.h>

#include "fair.h"
#include "error.h"
#include "conf.h"
#include "address.h"
#include "avlr.h"

#include "worker.h"

#define PRIO_MAX (~UINT32_C(0))

static worker_t worker_0 = {{0}};
static worker_t *worker_last = NULL;

static u32 worker_prio(const worker_t *wrk) {
      u32 capacity, nconn;

      unless(wrk) return PRIO_MAX;

      capacity = wrk->capacity;
      if(capacity) {
            nconn = wrk->nconn;
            if(nconn > UINT16_MAX)
                  nconn = UINT16_MAX;

            nconn += wrk->hysteresis;
            if(nconn > UINT16_MAX)
                  nconn = UINT16_MAX;

            return (nconn << 16) / capacity;
      }
      return PRIO_MAX;
}

static int worker_cmp_name(const worker_t *aw, const worker_t *bw) {
      assert(aw && bw);
      return strcasecmp(aw->name, bw->name);
}

static int worker_cmp_best(const worker_t *aw, const worker_t *bw) {
      avl_node_t *n;
      bool ab, bb;
      u32 ar, br;
      address_t *aa, *ba;
#if 0
      int ac, bc;
      sa_family_t af, bf;
#endif

      assert(aw && bw);

      /* an worker with no addresses always loses from a worker that
      ** does have addresses */
      n = aw->prio.head;
      if(n) {
            aa = n->item;
            n = bw->prio.head;
            if(!n)
                  return -1;
            ba = n->item;
      } else {
            if(bw->prio.head)
                  return 1;
            aa = ba = NULL;
      }

      /* at this point either both or neither of aa and ba are set */
      if(aa && ba) {
            ab = aa->disabled;
            bb = ba->disabled;
            if(!ab && bb)
                  return -1;
            if(ab && !bb)
                  return 1;

            ab = aa->dead;
            bb = ba->dead;
            if(!ab && bb)
                  return -1;
            if(ab && !bb)
                  return 1;
      }

      ar = worker_prio(aw);
      br = worker_prio(bw);
      if(ar < br)
            return -1;
      if(ar > br)
            return 1;

      if(aa && ba) {
            ar = aa->errors;
            br = ba->errors;
            if(ar < br)
                  return -1;
            if(ar > br)
                  return 1;
      }

#if 0
      /* if both have equal load, start with the strongest */
      ac = aw->capacity;
      bc = bw->capacity;
      if(ac > bc)
            return -1;
      if(ac < bc)
            return 1;

      /* slight preference for newer address families */
      if(aa && ba) {
            af = address_sa(aa)->sa_family;
            bf = address_sa(ba)->sa_family;
            if(af > bf)
                  return -1;
            if(af < bf)
                  return 1;
      }

      if(aw > bw)
            return -1;
      if(aw < bw)
            return 1;
#endif

      return 0;
}

static avl_tree_t workers_name = {NULL, NULL, NULL, (avl_compare_t)worker_cmp_name, NULL};
static avl_tree_t workers_best = {NULL, NULL, NULL, (avl_compare_t)worker_cmp_best, NULL};

unsigned int worker_count(void) {
      return avl_count(&workers_name);
}

static void worker_changed(worker_t *wrk) {
      unless(wrk) return;
      avl_unlink_node(&workers_best, &wrk->best);
      avl_insert_right(&workers_best, &wrk->best);
}

static void address_changed(address_t *addr, address_event_t evt) {
      worker_t *wrk;

      unless(addr) return;
      wrk = addr->data;
      unless(wrk) return;

      avl_unlink_node(&wrk->prio, &addr->prio);
      avl_insert_right(&wrk->prio, &addr->prio);

      if(evt == ADDRESS_EVENT_LOAD) {
            wrk->nconn++;
            wrk->hysteresis = 0;
            worker_changed(wrk);
            /* can't change two workers at the same time */
            if(wrk != worker_last) {
                  if(worker_last) {
                        worker_last->hysteresis = conf_Hysteresis;
                        worker_changed(worker_last);
                  }
                  worker_last = wrk;
            }
      } else if(evt == ADDRESS_EVENT_UNLOAD) {
            wrk->nconn--;
            worker_changed(wrk);
      }
}

worker_t *worker_new(const char *name) {
      worker_t *wrk;
      size_t len;
      assert(name);
      len = strlen(name);
      wrk = xalloc(sizeof *wrk + len);
      *wrk = worker_0;
      strcpy(wrk->name, name);
      wrk->hysteresis = conf_Hysteresis;
      avl_init_node(&wrk->node, wrk);
      avl_init_node(&wrk->best, wrk);
      avl_init_tree(&wrk->addr, (avl_compare_t)address_cmp_addr, NULL);
      avl_init_tree(&wrk->prio, (avl_compare_t)address_cmp_prio, NULL);
      avl_insert_node(&workers_name, &wrk->node);
      avl_insert_right(&workers_best, &wrk->best);
      syslog(LOG_INFO, "Added new worker node %s", name);
      return wrk;
}

worker_t *worker_byname(const char *name) {
      avl_node_t *n;
      worker_t *wrk;
      size_t len;
      unless(name) return NULL;
      len = strlen(name);
      wrk = alloca(sizeof *wrk + len);
      unless(wrk) return NULL;
      *wrk = worker_0;
      strcpy(wrk->name, name);
      n = avl_search(&workers_name, wrk);
      return n ? n->item : NULL;
}

void worker_update(worker_t *wrk, int capacity, const struct sockaddr *sa, socklen_t len) {
      address_t *addr;
      address_string_t str;
      int oldcapacity;

      unless(wrk) return;

      oldcapacity = wrk->capacity;
      addr = address_byaddr(&wrk->addr, sa, len);
      if(addr) {
            wrk->capacity = capacity;
            address_update(addr);
            if(oldcapacity != capacity)
                  worker_changed(wrk);
      } else if(address_authorized(sa, len)) {
            wrk->capacity = capacity;
            addr = address_new(sa, len, address_changed, wrk);
            assert(addr);
            avl_insert_node(&wrk->addr, &addr->node);
            avl_insert_right(&wrk->prio, &addr->prio);
            address_string_sa(&str, sa, len);
            syslog(LOG_INFO, "Added new worker address %s %s to %s",
                  str.host, str.serv, wrk->name);
            worker_changed(wrk);
      }
}

static void worker_print(const worker_t *wrk, const address_t *best) {
      avl_node_t *n;
      address_t *addr;

      unless(wrk) return;

      eprintf("\r%s: capacity:%d conn:%d score:%llu\033[K\n", wrk->name,
            (int)wrk->capacity, (int)wrk->nconn,
            (unsigned long long)worker_prio(wrk));

      for(n = wrk->addr.head; n; n = n->next) {
            addr = n->item;
            assert(addr);
            eprintf("%s\tconn:%d errors:%d dead:%d disabled:%d %s %s\n",
                  addr == best ? " *" : "",
                  (int)addr->nconn, (int)addr->errors,
                  (int)addr->dead, (int)addr->disabled,
                  addr->str.host, addr->str.serv);
      }
}

address_t *worker_bestaddress(void) {
      avl_node_t *n;
      worker_t *wrk;
      address_t *addr;
      n = workers_best.head;
      if(!n)
            return NULL;
      wrk = n->item;
      unless(wrk) return NULL;
      n = wrk->prio.head;
      if(!n)
            return NULL;
      addr = n->item;
      if(conf_Debug)
            for(n = workers_best.head; n; n = n->next)
                  worker_print(n->item, addr);
      return addr;
}

Generated by  Doxygen 1.6.0   Back to index