source file: /home/buildslave/tahoe/edgy/build/src/allmydata/storage/expirer.py
file stats: 232 lines, 232 executed: 100.0% covered
coverage versus previous test: 0 lines added, 0 lines removed
    1. import time, os, pickle, struct
    2. from crawler import ShareCrawler
    3. from shares import get_share_file
    4. from common import UnknownMutableContainerVersionError, \
    5.      UnknownImmutableContainerVersionError
    6. from twisted.python import log as twlog
    7. 
    8. class LeaseCheckingCrawler(ShareCrawler):
    9.     """I examine the leases on all shares, determining which are still valid
   10.     and which have expired. I can remove the expired leases (if so
   11.     configured), and the share will be deleted when the last lease is
   12.     removed.
   13. 
   14.     I collect statistics on the leases and make these available to a web
   15.     status page, including::
   16. 
   17.     Space recovered during this cycle-so-far:
   18.      actual (only if expiration_enabled=True):
   19.       num-buckets, num-shares, sum of share sizes, real disk usage
   20.       ('real disk usage' means we use stat(fn).st_blocks*512 and include any
   21.        space used by the directory)
   22.      what it would have been with the original lease expiration time
   23.      what it would have been with our configured expiration time
   24. 
   25.     Prediction of space that will be recovered during the rest of this cycle
   26.     Prediction of space that will be recovered by the entire current cycle.
   27. 
   28.     Space recovered during the last 10 cycles  <-- saved in separate pickle
   29. 
   30.     Shares/buckets examined:
   31.      this cycle-so-far
   32.      prediction of rest of cycle
   33.      during last 10 cycles <-- separate pickle
   34.     start/finish time of last 10 cycles  <-- separate pickle
   35.     expiration time used for last 10 cycles <-- separate pickle
   36. 
   37.     Histogram of leases-per-share:
   38.      this-cycle-to-date
   39.      last 10 cycles <-- separate pickle
   40.     Histogram of lease ages, buckets = 1day
   41.      cycle-to-date
   42.      last 10 cycles <-- separate pickle
   43. 
   44.     All cycle-to-date values remain valid until the start of the next cycle.
   45. 
   46.     """
   47. 
   48.     slow_start = 360 # wait 6 minutes after startup
   49.     minimum_cycle_time = 12*60*60 # not more than twice per day
   50. 
   51.     def __init__(self, server, statefile, historyfile,
   52.                  expiration_enabled, mode,
   53.                  override_lease_duration, # used if expiration_mode=="age"
   54.                  cutoff_date, # used if expiration_mode=="cutoff-date"
   55.                  sharetypes):
   56.         self.historyfile = historyfile
   57.         self.expiration_enabled = expiration_enabled
   58.         self.mode = mode
   59.         self.override_lease_duration = None
   60.         self.cutoff_date = None
   61.         if self.mode == "age":
   62.             assert isinstance(override_lease_duration, (int, type(None)))
   63.             self.override_lease_duration = override_lease_duration # seconds
   64.         elif self.mode == "cutoff-date":
   65.             assert isinstance(cutoff_date, int) # seconds-since-epoch
   66.             assert cutoff_date is not None
   67.             self.cutoff_date = cutoff_date
   68.         else:
   69.             raise ValueError("GC mode '%s' must be 'age' or 'cutoff-date'" % mode)
   70.         self.sharetypes_to_expire = sharetypes
   71.         ShareCrawler.__init__(self, server, statefile)
   72. 
   73.     def add_initial_state(self):
   74.         # we fill ["cycle-to-date"] here (even though they will be reset in
   75.         # self.started_cycle) just in case someone grabs our state before we
   76.         # get started: unit tests do this
   77.         so_far = self.create_empty_cycle_dict()
   78.         self.state.setdefault("cycle-to-date", so_far)
   79.         # in case we upgrade the code while a cycle is in progress, update
   80.         # the keys individually
   81.         for k in so_far:
   82.             self.state["cycle-to-date"].setdefault(k, so_far[k])
   83. 
   84.         # initialize history
   85.         if not os.path.exists(self.historyfile):
   86.             history = {} # cyclenum -> dict
   87.             f = open(self.historyfile, "wb")
   88.             pickle.dump(history, f)
   89.             f.close()
   90. 
   91.     def create_empty_cycle_dict(self):
   92.         recovered = self.create_empty_recovered_dict()
   93.         so_far = {"corrupt-shares": [],
   94.                   "space-recovered": recovered,
   95.                   "lease-age-histogram": {}, # (minage,maxage)->count
   96.                   "leases-per-share-histogram": {}, # leasecount->numshares
   97.                   }
   98.         return so_far
   99. 
  100.     def create_empty_recovered_dict(self):
  101.         recovered = {}
  102.         for a in ("actual", "original", "configured", "examined"):
  103.             for b in ("buckets", "shares", "sharebytes", "diskbytes"):
  104.                 recovered[a+"-"+b] = 0
  105.                 recovered[a+"-"+b+"-mutable"] = 0
  106.                 recovered[a+"-"+b+"-immutable"] = 0
  107.         return recovered
  108. 
  109.     def started_cycle(self, cycle):
  110.         self.state["cycle-to-date"] = self.create_empty_cycle_dict()
  111. 
  112.     def stat(self, fn):
  113.         return os.stat(fn)
  114. 
  115.     def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
  116.         bucketdir = os.path.join(prefixdir, storage_index_b32)
  117.         s = self.stat(bucketdir)
  118.         would_keep_shares = []
  119.         wks = None
  120. 
  121.         for fn in os.listdir(bucketdir):
  122.             try:
  123.                 shnum = int(fn)
  124.             except ValueError:
  125.                 continue # non-numeric means not a sharefile
  126.             sharefile = os.path.join(bucketdir, fn)
  127.             try:
  128.                 wks = self.process_share(sharefile)
  129.             except (UnknownMutableContainerVersionError,
  130.                     UnknownImmutableContainerVersionError,
  131.                     struct.error):
  132.                 twlog.msg("lease-checker error processing %s" % sharefile)
  133.                 twlog.err()
  134.                 which = (storage_index_b32, shnum)
  135.                 self.state["cycle-to-date"]["corrupt-shares"].append(which)
  136.                 wks = (1, 1, 1, "unknown")
  137.             would_keep_shares.append(wks)
  138. 
  139.         sharetype = None
  140.         if wks:
  141.             # use the last share's sharetype as the buckettype
  142.             sharetype = wks[3]
  143.         rec = self.state["cycle-to-date"]["space-recovered"]
  144.         self.increment(rec, "examined-buckets", 1)
  145.         if sharetype:
  146.             self.increment(rec, "examined-buckets-"+sharetype, 1)
  147. 
  148.         try:
  149.             bucket_diskbytes = s.st_blocks * 512
  150.         except AttributeError:
  151.             bucket_diskbytes = 0 # no stat().st_blocks on windows
  152.         if sum([wks[0] for wks in would_keep_shares]) == 0:
  153.             self.increment_bucketspace("original", bucket_diskbytes, sharetype)
  154.         if sum([wks[1] for wks in would_keep_shares]) == 0:
  155.             self.increment_bucketspace("configured", bucket_diskbytes, sharetype)
  156.         if sum([wks[2] for wks in would_keep_shares]) == 0:
  157.             self.increment_bucketspace("actual", bucket_diskbytes, sharetype)
  158. 
  159.     def process_share(self, sharefilename):
  160.         # first, find out what kind of a share it is
  161.         sf = get_share_file(sharefilename)
  162.         sharetype = sf.sharetype
  163.         now = time.time()
  164.         s = self.stat(sharefilename)
  165. 
  166.         num_leases = 0
  167.         num_valid_leases_original = 0
  168.         num_valid_leases_configured = 0
  169.         expired_leases_configured = []
  170. 
  171.         for li in sf.get_leases():
  172.             num_leases += 1
  173.             original_expiration_time = li.get_expiration_time()
  174.             grant_renew_time = li.get_grant_renew_time_time()
  175.             age = li.get_age()
  176.             self.add_lease_age_to_histogram(age)
  177. 
  178.             #  expired-or-not according to original expiration time
  179.             if original_expiration_time > now:
  180.                 num_valid_leases_original += 1
  181. 
  182.             #  expired-or-not according to our configured age limit
  183.             expired = False
  184.             if self.mode == "age":
  185.                 age_limit = original_expiration_time
  186.                 if self.override_lease_duration is not None:
  187.                     age_limit = self.override_lease_duration
  188.                 if age > age_limit:
  189.                     expired = True
  190.             else:
  191.                 assert self.mode == "cutoff-date"
  192.                 if grant_renew_time < self.cutoff_date:
  193.                     expired = True
  194.             if sharetype not in self.sharetypes_to_expire:
  195.                 expired = False
  196. 
  197.             if expired:
  198.                 expired_leases_configured.append(li)
  199.             else:
  200.                 num_valid_leases_configured += 1
  201. 
  202.         so_far = self.state["cycle-to-date"]
  203.         self.increment(so_far["leases-per-share-histogram"], num_leases, 1)
  204.         self.increment_space("examined", s, sharetype)
  205. 
  206.         would_keep_share = [1, 1, 1, sharetype]
  207. 
  208.         if self.expiration_enabled:
  209.             for li in expired_leases_configured:
  210.                 sf.cancel_lease(li.cancel_secret)
  211. 
  212.         if num_valid_leases_original == 0:
  213.             would_keep_share[0] = 0
  214.             self.increment_space("original", s, sharetype)
  215. 
  216.         if num_valid_leases_configured == 0:
  217.             would_keep_share[1] = 0
  218.             self.increment_space("configured", s, sharetype)
  219.             if self.expiration_enabled:
  220.                 would_keep_share[2] = 0
  221.                 self.increment_space("actual", s, sharetype)
  222. 
  223.         return would_keep_share
  224. 
  225.     def increment_space(self, a, s, sharetype):
  226.         sharebytes = s.st_size
  227.         try:
  228.             # note that stat(2) says that st_blocks is 512 bytes, and that
  229.             # st_blksize is "optimal file sys I/O ops blocksize", which is
  230.             # independent of the block-size that st_blocks uses.
  231.             diskbytes = s.st_blocks * 512
  232.         except AttributeError:
  233.             # the docs say that st_blocks is only on linux. I also see it on
  234.             # MacOS. But it isn't available on windows.
  235.             diskbytes = sharebytes
  236.         so_far_sr = self.state["cycle-to-date"]["space-recovered"]
  237.         self.increment(so_far_sr, a+"-shares", 1)
  238.         self.increment(so_far_sr, a+"-sharebytes", sharebytes)
  239.         self.increment(so_far_sr, a+"-diskbytes", diskbytes)
  240.         if sharetype:
  241.             self.increment(so_far_sr, a+"-shares-"+sharetype, 1)
  242.             self.increment(so_far_sr, a+"-sharebytes-"+sharetype, sharebytes)
  243.             self.increment(so_far_sr, a+"-diskbytes-"+sharetype, diskbytes)
  244. 
  245.     def increment_bucketspace(self, a, bucket_diskbytes, sharetype):
  246.         rec = self.state["cycle-to-date"]["space-recovered"]
  247.         self.increment(rec, a+"-diskbytes", bucket_diskbytes)
  248.         self.increment(rec, a+"-buckets", 1)
  249.         if sharetype:
  250.             self.increment(rec, a+"-diskbytes-"+sharetype, bucket_diskbytes)
  251.             self.increment(rec, a+"-buckets-"+sharetype, 1)
  252. 
  253.     def increment(self, d, k, delta=1):
  254.         if k not in d:
  255.             d[k] = 0
  256.         d[k] += delta
  257. 
  258.     def add_lease_age_to_histogram(self, age):
  259.         bucket_interval = 24*60*60
  260.         bucket_number = int(age/bucket_interval)
  261.         bucket_start = bucket_number * bucket_interval
  262.         bucket_end = bucket_start + bucket_interval
  263.         k = (bucket_start, bucket_end)
  264.         self.increment(self.state["cycle-to-date"]["lease-age-histogram"], k, 1)
  265. 
  266.     def convert_lease_age_histogram(self, lah):
  267.         # convert { (minage,maxage) : count } into [ (minage,maxage,count) ]
  268.         # since the former is not JSON-safe (JSON dictionaries must have
  269.         # string keys).
  270.         json_safe_lah = []
  271.         for k in sorted(lah):
  272.             (minage,maxage) = k
  273.             json_safe_lah.append( (minage, maxage, lah[k]) )
  274.         return json_safe_lah
  275. 
  276.     def finished_cycle(self, cycle):
  277.         # add to our history state, prune old history
  278.         h = {}
  279. 
  280.         start = self.state["current-cycle-start-time"]
  281.         now = time.time()
  282.         h["cycle-start-finish-times"] = (start, now)
  283.         h["expiration-enabled"] = self.expiration_enabled
  284.         h["configured-expiration-mode"] = (self.mode,
  285.                                            self.override_lease_duration,
  286.                                            self.cutoff_date,
  287.                                            self.sharetypes_to_expire)
  288. 
  289.         s = self.state["cycle-to-date"]
  290. 
  291.         # state["lease-age-histogram"] is a dictionary (mapping
  292.         # (minage,maxage) tuple to a sharecount), but we report
  293.         # self.get_state()["lease-age-histogram"] as a list of
  294.         # (min,max,sharecount) tuples, because JSON can handle that better.
  295.         # We record the list-of-tuples form into the history for the same
  296.         # reason.
  297.         lah = self.convert_lease_age_histogram(s["lease-age-histogram"])
  298.         h["lease-age-histogram"] = lah
  299.         h["leases-per-share-histogram"] = s["leases-per-share-histogram"].copy()
  300.         h["corrupt-shares"] = s["corrupt-shares"][:]
  301.         # note: if ["shares-recovered"] ever acquires an internal dict, this
  302.         # copy() needs to become a deepcopy
  303.         h["space-recovered"] = s["space-recovered"].copy()
  304. 
  305.         history = pickle.load(open(self.historyfile, "rb"))
  306.         history[cycle] = h
  307.         while len(history) > 10:
  308.             oldcycles = sorted(history.keys())
  309.             del history[oldcycles[0]]
  310.         f = open(self.historyfile, "wb")
  311.         pickle.dump(history, f)
  312.         f.close()
  313. 
  314.     def get_state(self):
  315.         """In addition to the crawler state described in
  316.         ShareCrawler.get_state(), I return the following keys which are
  317.         specific to the lease-checker/expirer. Note that the non-history keys
  318.         (with 'cycle' in their names) are only present if a cycle is
  319.         currently running. If the crawler is between cycles, it appropriate
  320.         to show the latest item in the 'history' key instead. Also note that
  321.         each history item has all the data in the 'cycle-to-date' value, plus
  322.         cycle-start-finish-times.
  323. 
  324.          cycle-to-date:
  325.           expiration-enabled
  326.           configured-expiration-mode
  327.           lease-age-histogram (list of (minage,maxage,sharecount) tuples)
  328.           leases-per-share-histogram
  329.           corrupt-shares (list of (si_b32,shnum) tuples, minimal verification)
  330.           space-recovered
  331. 
  332.          estimated-remaining-cycle:
  333.           # Values may be None if not enough data has been gathered to
  334.           # produce an estimate.
  335.           space-recovered
  336. 
  337.          estimated-current-cycle:
  338.           # cycle-to-date plus estimated-remaining. Values may be None if
  339.           # not enough data has been gathered to produce an estimate.
  340.           space-recovered
  341. 
  342.          history: maps cyclenum to a dict with the following keys:
  343.           cycle-start-finish-times
  344.           expiration-enabled
  345.           configured-expiration-mode
  346.           lease-age-histogram
  347.           leases-per-share-histogram
  348.           corrupt-shares
  349.           space-recovered
  350. 
  351.          The 'space-recovered' structure is a dictionary with the following
  352.          keys:
  353.           # 'examined' is what was looked at
  354.           examined-buckets, examined-buckets-mutable, examined-buckets-immutable
  355.           examined-shares, -mutable, -immutable
  356.           examined-sharebytes, -mutable, -immutable
  357.           examined-diskbytes, -mutable, -immutable
  358. 
  359.           # 'actual' is what was actually deleted
  360.           actual-buckets, -mutable, -immutable
  361.           actual-shares, -mutable, -immutable
  362.           actual-sharebytes, -mutable, -immutable
  363.           actual-diskbytes, -mutable, -immutable
  364. 
  365.           # would have been deleted, if the original lease timer was used
  366.           original-buckets, -mutable, -immutable
  367.           original-shares, -mutable, -immutable
  368.           original-sharebytes, -mutable, -immutable
  369.           original-diskbytes, -mutable, -immutable
  370. 
  371.           # would have been deleted, if our configured max_age was used
  372.           configured-buckets, -mutable, -immutable
  373.           configured-shares, -mutable, -immutable
  374.           configured-sharebytes, -mutable, -immutable
  375.           configured-diskbytes, -mutable, -immutable
  376. 
  377.         """
  378.         progress = self.get_progress()
  379. 
  380.         state = ShareCrawler.get_state(self) # does a shallow copy
  381.         history = pickle.load(open(self.historyfile, "rb"))
  382.         state["history"] = history
  383. 
  384.         if not progress["cycle-in-progress"]:
  385.             del state["cycle-to-date"]
  386.             return state
  387. 
  388.         so_far = state["cycle-to-date"].copy()
  389.         state["cycle-to-date"] = so_far
  390. 
  391.         lah = so_far["lease-age-histogram"]
  392.         so_far["lease-age-histogram"] = self.convert_lease_age_histogram(lah)
  393.         so_far["expiration-enabled"] = self.expiration_enabled
  394.         so_far["configured-expiration-mode"] = (self.mode,
  395.                                                 self.override_lease_duration,
  396.                                                 self.cutoff_date,
  397.                                                 self.sharetypes_to_expire)
  398. 
  399.         so_far_sr = so_far["space-recovered"]
  400.         remaining_sr = {}
  401.         remaining = {"space-recovered": remaining_sr}
  402.         cycle_sr = {}
  403.         cycle = {"space-recovered": cycle_sr}
  404. 
  405.         if progress["cycle-complete-percentage"] > 0.0:
  406.             pc = progress["cycle-complete-percentage"] / 100.0
  407.             m = (1-pc)/pc
  408.             for a in ("actual", "original", "configured", "examined"):
  409.                 for b in ("buckets", "shares", "sharebytes", "diskbytes"):
  410.                     for c in ("", "-mutable", "-immutable"):
  411.                         k = a+"-"+b+c
  412.                         remaining_sr[k] = m * so_far_sr[k]
  413.                         cycle_sr[k] = so_far_sr[k] + remaining_sr[k]
  414.         else:
  415.             for a in ("actual", "original", "configured", "examined"):
  416.                 for b in ("buckets", "shares", "sharebytes", "diskbytes"):
  417.                     for c in ("", "-mutable", "-immutable"):
  418.                         k = a+"-"+b+c
  419.                         remaining_sr[k] = None
  420.                         cycle_sr[k] = None
  421. 
  422.         state["estimated-remaining-cycle"] = remaining
  423.         state["estimated-current-cycle"] = cycle
  424.         return state