source file: /home/buildslave/tahoe/edgy/build/src/allmydata/scripts/common.py
file stats: 122 lines, 102 executed: 83.6% covered
coverage versus previous test: 0 lines added, 0 lines removed
1.
2. import os, sys, urllib
3. from twisted.python import usage
4.
5.
6. class BaseOptions:
7. # unit tests can override these to point at StringIO instances
8. stdin = sys.stdin
9. stdout = sys.stdout
10. stderr = sys.stderr
11.
12. optFlags = [
13. ["quiet", "q", "Operate silently."],
14. ["version", "V", "Display version numbers and exit."],
15. ["version-and-path", None, "Display version numbers and paths to their locations and exit."],
16. ]
17.
18. def opt_version(self):
19. import allmydata
20. print allmydata.get_package_versions_string()
21. sys.exit(0)
22.
23. def opt_version_and_path(self):
24. import allmydata
25. print allmydata.get_package_versions_string(show_paths=True)
26. sys.exit(0)
27.
28.
29. class BasedirMixin:
30. optFlags = [
31. ["multiple", "m", "allow multiple basedirs to be specified at once"],
32. ]
33.
34. def postOptions(self):
35. if not self.basedirs:
36. raise usage.UsageError("<basedir> parameter is required")
37. if self['basedir']:
38. del self['basedir']
39. self['basedirs'] = [os.path.abspath(os.path.expanduser(b)) for b in self.basedirs]
40.
41. def parseArgs(self, *args):
42. from allmydata.util.assertutil import precondition
43. self.basedirs = []
44. if self['basedir']:
45. precondition(isinstance(self['basedir'], (str, unicode)), self['basedir'])
46. self.basedirs.append(self['basedir'])
47. if self['multiple']:
48. precondition(not [x for x in args if not isinstance(x, (str, unicode))], args)
49. self.basedirs.extend(args)
50. else:
51. if len(args) == 0 and not self.basedirs:
52. if sys.platform == 'win32':
53. from allmydata.windows import registry
54. rbdp = registry.get_base_dir_path()
55. if rbdp:
56. precondition(isinstance(registry.get_base_dir_path(), (str, unicode)), registry.get_base_dir_path())
57. self.basedirs.append(rbdp)
58. else:
59. precondition(isinstance(os.path.expanduser("~/.tahoe"), (str, unicode)), os.path.expanduser("~/.tahoe"))
60. self.basedirs.append(os.path.expanduser("~/.tahoe"))
61. if len(args) > 0:
62. precondition(isinstance(args[0], (str, unicode)), args[0])
63. self.basedirs.append(args[0])
64. if len(args) > 1:
65. raise usage.UsageError("I wasn't expecting so many arguments")
66.
67. class NoDefaultBasedirMixin(BasedirMixin):
68. def parseArgs(self, *args):
69. from allmydata.util.assertutil import precondition
70. # create-client won't default to --basedir=~/.tahoe
71. self.basedirs = []
72. if self['basedir']:
73. precondition(isinstance(self['basedir'], (str, unicode)), self['basedir'])
74. self.basedirs.append(self['basedir'])
75. if self['multiple']:
76. precondition(not [x for x in args if not isinstance(x, (str, unicode))], args)
77. self.basedirs.extend(args)
78. else:
79. if len(args) > 0:
80. precondition(isinstance(args[0], (str, unicode)), args[0])
81. self.basedirs.append(args[0])
82. if len(args) > 1:
83. raise usage.UsageError("I wasn't expecting so many arguments")
84. if not self.basedirs:
85. raise usage.UsageError("--basedir must be provided")
86.
87. DEFAULT_ALIAS = "tahoe"
88.
89.
90. def get_aliases(nodedir):
91. from allmydata import uri
92. aliases = {}
93. aliasfile = os.path.join(nodedir, "private", "aliases")
94. rootfile = os.path.join(nodedir, "private", "root_dir.cap")
95. try:
96. f = open(rootfile, "r")
97. rootcap = f.read().strip()
98. if rootcap:
99. aliases["tahoe"] = uri.from_string_dirnode(rootcap).to_string()
100. except EnvironmentError:
101. pass
102. try:
103. f = open(aliasfile, "r")
104. for line in f.readlines():
105. line = line.strip()
106. if line.startswith("#") or not line:
107. continue
108. name, cap = line.split(":", 1)
109. # normalize it: remove http: prefix, urldecode
110. cap = cap.strip()
111. aliases[name] = uri.from_string_dirnode(cap).to_string()
112. except EnvironmentError:
113. pass
114. return aliases
115.
116. class DefaultAliasMarker:
117. pass
118.
119. pretend_platform_uses_lettercolon = False # for tests
120. def platform_uses_lettercolon_drivename():
121. if ("win32" in sys.platform.lower()
122. or "cygwin" in sys.platform.lower()
123. or pretend_platform_uses_lettercolon):
124. return True
125. return False
126.
127. class UnknownAliasError(Exception):
128. pass
129.
130. def get_alias(aliases, path, default):
131. # transform "work:path/filename" into (aliases["work"], "path/filename").
132. # If default=None, then an empty alias is indicated by returning
133. # DefaultAliasMarker. We special-case "URI:" to make it easy to access
134. # specific files/directories by their read-cap.
135. path = path.strip()
136. if path.startswith("URI:"):
137. # The only way to get a sub-path is to use URI:blah:./foo, and we
138. # strip out the :./ sequence.
139. sep = path.find(":./")
140. if sep != -1:
141. return path[:sep], path[sep+3:]
142. return path, ""
143. colon = path.find(":")
144. if colon == -1:
145. # no alias
146. if default == None:
147. return DefaultAliasMarker, path
148. return aliases[default], path
149. if colon == 1 and default == None and platform_uses_lettercolon_drivename():
150. # treat C:\why\must\windows\be\so\weird as a local path, not a tahoe
151. # file in the "C:" alias
152. return DefaultAliasMarker, path
153. alias = path[:colon]
154. if "/" in alias:
155. # no alias, but there's a colon in a dirname/filename, like
156. # "foo/bar:7"
157. if default == None:
158. return DefaultAliasMarker, path
159. return aliases[default], path
160. if alias not in aliases:
161. raise UnknownAliasError("Unknown alias '%s', please create it with 'tahoe add-alias' or 'tahoe create-alias'." % alias)
162. return aliases[alias], path[colon+1:]
163.
164. def escape_path(path):
165. segments = path.split("/")
166. return "/".join([urllib.quote(s) for s in segments])