144 lines
		
	
	
	
		
			5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			144 lines
		
	
	
	
		
			5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from bundlewrap.items import Item, ItemStatus
 | |
| from bundlewrap.exceptions import BundleError
 | |
| from bundlewrap.utils.text import force_text, mark_for_translation as _
 | |
| from bundlewrap.utils.remote import PathInfo
 | |
| import types
 | |
| from shlex import quote
 | |
| 
 | |
| # Downloaded from https://github.com/bundlewrap/plugins/blob/master/item_download/items/download.py
 | |
| # No, we can't use plugins here, because bw4 won't support them anymore.
 | |
| 
 | |
| class Download(Item):
 | |
|     """
 | |
|     Download a file and verify its Hash.
 | |
|     """
 | |
|     BUNDLE_ATTRIBUTE_NAME = "downloads"
 | |
|     NEEDS_STATIC = [
 | |
|         "pkg_apt:",
 | |
|         "pkg_pacman:",
 | |
|         "pkg_yum:",
 | |
|         "pkg_zypper:",
 | |
|     ]
 | |
|     ITEM_ATTRIBUTES = {
 | |
|         'url': None,
 | |
|         'sha256': None,
 | |
|         'sha256_url': None,
 | |
|         'gpg_signature_url': None,
 | |
|         'gpg_pubkey_url': None,
 | |
|         'verifySSL': True,
 | |
|         'decompress': None,
 | |
|     }
 | |
|     ITEM_TYPE_NAME = "download"
 | |
|     REQUIRED_ATTRIBUTES = ['url']
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return "<Download name:{}>".format(self.name)
 | |
| 
 | |
|     def __hash_remote_file(self, filename):
 | |
|         path_info = PathInfo(self.node, filename)
 | |
|         if not path_info.is_file:
 | |
|             return None
 | |
| 
 | |
|         if hasattr(path_info, 'sha256'):
 | |
|             return path_info.sha256
 | |
|         else:
 | |
|             """"pending pr so do it manualy"""
 | |
|             if self.node.os == 'macos':
 | |
|                 result = self.node.run("shasum -a 256 -- {}".format(quote(filename)))
 | |
|             elif self.node.os in self.node.OS_FAMILY_BSD:
 | |
|                 result = self.node.run("sha256 -q -- {}".format(quote(filename)))
 | |
|             else:
 | |
|                 result = self.node.run("sha256sum -- {}".format(quote(filename)))
 | |
|             return force_text(result.stdout).strip().split()[0]
 | |
| 
 | |
|     def fix(self, status):
 | |
|         if status.must_be_deleted:
 | |
|             # Not possible
 | |
|             pass
 | |
|         else:
 | |
|             decompress = self.attributes.get('decompress')
 | |
|             # download file
 | |
|             self.node.run("curl -L {verify}-s -- {url}{pipe} > {file}".format(
 | |
|                 pipe = ' | ' + decompress if decompress else '',
 | |
|                 verify="" if self.attributes.get('verifySSL', True) else "-k ",
 | |
|                 file=quote(self.name),
 | |
|                 url=quote(self.attributes['url'])
 | |
|             ))
 | |
| 
 | |
|     def cdict(self):
 | |
|         """This is how the world should be"""
 | |
|         cdict = {
 | |
|             'type': 'download',
 | |
|         }
 | |
| 
 | |
|         if self.attributes.get('sha256'):
 | |
|             cdict['sha256'] = self.attributes['sha256']
 | |
|         elif self.attributes.get('gpg_signature_url'):
 | |
|             cdict['verified'] = True
 | |
|         elif self.attributes.get('sha256_url'):
 | |
|             full_sha256_url = self.attributes['sha256_url'].format(url=self.attributes['url'])
 | |
|             cdict['sha256'] = force_text(
 | |
|                 self.node.run(f"curl -sL -- {quote(full_sha256_url)}").stdout
 | |
|             ).strip().split()[0]
 | |
|         else:
 | |
|             raise
 | |
| 
 | |
|         return cdict
 | |
| 
 | |
|     def sdict(self):
 | |
|         """This is how the world is right now"""
 | |
|         path_info = PathInfo(self.node, self.name)
 | |
|         if not path_info.exists:
 | |
|             return None
 | |
|         else:
 | |
|             sdict = {
 | |
|                 'type': 'download',
 | |
|             }
 | |
|             if self.attributes.get('sha256'):
 | |
|                 sdict['sha256'] = self.__hash_remote_file(self.name)
 | |
|             elif self.attributes.get('sha256_url'):
 | |
|                 sdict['sha256'] = self.__hash_remote_file(self.name)
 | |
|             elif self.attributes.get('gpg_signature_url'):
 | |
|                 full_signature_url = self.attributes['gpg_signature_url'].format(url=self.attributes['url'])
 | |
|                 signature_path = f'{self.name}.signature'
 | |
| 
 | |
|                 self.node.run(f"curl -sSL {self.attributes['gpg_pubkey_url']} | gpg --import -")
 | |
|                 self.node.run(f"curl -L {full_signature_url} -o {quote(signature_path)}")
 | |
|                 gpg_output = self.node.run(f"gpg --verify {quote(signature_path)} {quote(self.name)}").stderr
 | |
| 
 | |
|                 if b'Good signature' in gpg_output:
 | |
|                     sdict['verified'] = True
 | |
|                 else:
 | |
|                     sdict['verified'] = False
 | |
| 
 | |
|         return sdict
 | |
| 
 | |
|     @classmethod
 | |
|     def validate_attributes(cls, bundle, item_id, attributes):
 | |
|         if (
 | |
|             'sha256' not in attributes and
 | |
|             'sha256_url' not in attributes and
 | |
|             'gpg_signature_url'not in attributes
 | |
|         ):
 | |
|             raise BundleError(_(
 | |
|                 "at least one hash must be set on {item} in bundle '{bundle}'"
 | |
|             ).format(
 | |
|                 bundle=bundle.name,
 | |
|                 item=item_id,
 | |
|             ))
 | |
| 
 | |
|         if 'url' not in attributes:
 | |
|             raise BundleError(_(
 | |
|                 "you need to specify the url on {item} in bundle '{bundle}'"
 | |
|             ).format(
 | |
|                 bundle=bundle.name,
 | |
|                 item=item_id,
 | |
|             ))
 | |
| 
 | |
|     def get_auto_deps(self, items):
 | |
|         deps = []
 | |
|         for item in items:
 | |
|             # debian TODO: add other package manager
 | |
|             if item.ITEM_TYPE_NAME == 'pkg_apt' and item.name == 'curl':
 | |
|                 deps.append(item.id)
 | |
|         return deps
 |